# DICOM Dataset QA: Anonymization, Header Validation & Modality Tag Checks

This notebook runs basic quality checks on a DICOM dataset:

1. **Anonymization checks**
   - PHI / sensitive tags (e.g., PatientName, PatientID, InstitutionName)
   - Private tags

2. **Header validation**
   - Mandatory DICOM tags exist and are non-empty
   - Simple type / format checks

3. **Required content**
   - Presence of dataset-specific tags (configurable)

4. **Modality-consistent tagging**
   - Sanity checks tailored per Modality (CT, MR, etc.)

The goal is to quickly identify *bad* files: non-anonymized, missing critical meta, or inconsistent with the declared modality.


In [13]:
# ===========================
# 0. Imports & Basic Config
# ===========================
import os
import sys
from pathlib import Path
from collections import defaultdict, Counter

import pandas as pd

try:
    import pydicom
    from pydicom.errors import InvalidDicomError
except ImportError:
    !pip install -q pydicom
    import pydicom
    from pydicom.errors import InvalidDicomError

DATA_ROOT = Path("/kaggle/input/hippocampal-sparing-dataset/HippocampalMRISlices/01")  

# If you want to limit how many files to scan during experimentation
MAX_FILES = None  # or set like 1000

## 1. Discover DICOM Files

We recursively walk the dataset directory and collect `.dcm` or DICOM-like files.

You can:
- Filter by extension (`.dcm`, `.ima`, etc.), or  
- Try *all* files and let `pydicom` decide if they’re valid DICOM.

In [14]:
# ===========================
# 1. Discover DICOM Files
# ===========================

def discover_dicom_files(root: Path, max_files=None):
    dicom_paths = []
    for dirpath, _, filenames in os.walk(root):
        for fname in filenames:
            fpath = Path(dirpath) / fname

            # Option A: Extension-based
            if fpath.suffix.lower() in {".dcm", ".dicom", ".ima", ""}:
                dicom_paths.append(fpath)

            # Option B: brute-force all files (comment out A, uncomment B)
            # dicom_paths.append(fpath)

            if max_files is not None and len(dicom_paths) >= max_files:
                return dicom_paths
    return dicom_paths

dicom_files = discover_dicom_files(DATA_ROOT, max_files=MAX_FILES)

print(f"Found {len(dicom_files)} candidate DICOM files.")
if len(dicom_files) == 0:
    print("⚠️ No files found. Check DATA_ROOT.")

Found 277 candidate DICOM files.


## 2. Safe DICOM Loader

We'll create a robust loader that:

- Catches `InvalidDicomError`.
- Uses `stop_before_pixels=True` for speed (we only care about headers).
- Returns `None` on failure, plus a log entry for later analysis.


In [15]:
# ===========================
# 2. Safe DICOM Loader
# ===========================

def safe_read_dicom(path: Path):
    try:
        ds = pydicom.dcmread(str(path), stop_before_pixels=True, force=False)
        return ds, None
    except InvalidDicomError as e:
        return None, f"Invalid DICOM: {e}"
    except Exception as e:
        return None, f"Read error: {e}"

# Quick sanity check on first few files
for i, p in enumerate(dicom_files[:5]):
    ds, err = safe_read_dicom(p)
    print(p.name, "OK" if err is None else f"ERROR: {err}")


MR.1.2.246.352.221.55685483938290936304826787115937244607.dcm OK
MR.1.2.246.352.221.46620217304493734562111362943041802116.dcm OK
MR.1.2.246.352.221.46610297499567687196592583109429057944.dcm OK
MR.1.2.246.352.221.551312179015625932810521172476992959900.dcm OK
MR.1.2.246.352.221.55732483383214337663879853476263275668.dcm OK


## 3. Configuration: Anonymization & Validation Rules

We define:

- **Sensitive tags**: must be empty or replaced with whitelisted dummy values.
- **Required tags (global)**: must exist and not be blank.
- **Modality-specific required tags**: extra requirements per modality.
- **Allowed modalities**: to flag weird or unknown modality codes.

You can tweak these lists for your real project or regulatory profile.


In [16]:
# ===========================
# 3. Rule Configuration
# ===========================

# DICOM keywords that often carry PHI / sensitive information
SENSITIVE_TAGS = [
    "PatientName",
    "PatientID",
    "PatientBirthDate",
    "PatientSex",
    "OtherPatientIDs",
    "OtherPatientNames",
    "PatientAddress",
    "PatientTelephoneNumbers",
    "PatientMotherBirthName",
    "PatientBirthName",
    "ReferringPhysicianName",
    "PerformingPhysicianName",
    "OperatorsName",
    "InstitutionName",
    "InstitutionAddress",
    "AccessionNumber"
]

# Values that we consider "safe" for anonymized fields
# Very opinionated – update to your anonymization policy
ANON_ALLOWED_VALUES = {
    "",  # empty
    " ",  # whitespace
    "ANON",
    "ANONYMOUS",
    "REMOVED",
    "REDACTED",
    "XXXXX",
    "XXXXXXXX"
}

# Required tags for *all* images
GLOBAL_REQUIRED_TAGS = [
    "StudyInstanceUID",
    "SeriesInstanceUID",
    "SOPInstanceUID",
    "Modality",
    "Rows",
    "Columns"
]

# Additional required tags per modality
MODALITY_REQUIRED_TAGS = {
    "CT": ["KVP"],  # typical but not universal
    "MR": ["MagneticFieldStrength"],
    "CR": ["ViewPosition"],
    "DX": ["ViewPosition"],
    "US": [],  # adjust as needed
}

# Allowed modality codes (to flag weird typos)
ALLOWED_MODALITIES = {
    "CT", "MR", "PT", "CR", "DX", "US", "XA", "MG", "RG", "OT"
}


## 4. Helper Functions for Tag Access & Simple Checks

We’ll add utilities to:

- Safely fetch tag values by keyword.
- Determine if a value is “empty”.
- Detect private tags.


In [17]:
# ===========================
# 4. Helper Functions
# ===========================

def get_tag(ds, keyword, default=None):
    """Safe keyword-based tag getter."""
    if ds is None:
        return default
    if keyword in ds:
        val = ds.get(keyword)
        # pydicom returns DataElement; .value is usually what we want
        return getattr(val, "value", val)
    # Fallback: try DataElement in dir with keyword
    try:
        return getattr(ds, keyword)
    except AttributeError:
        return default


def is_empty_value(v):
    if v is None:
        return True
    if isinstance(v, str) and v.strip() == "":
        return True
    if isinstance(v, (list, tuple)) and len(v) == 0:
        return True
    return False


def has_private_tags(ds):
    """Return True if any private tag is present."""
    if ds is None:
        return False
    for elem in ds.iterall():
        if elem.tag.is_private:
            return True
    return False


## 5. Anonymization Checks

We’ll check, per file:

1. Any **sensitive tag** has a value not in `ANON_ALLOWED_VALUES`.
2. Any **private tags** exist (often contain vendor-specific or PHI).

For each file we record:

- `has_phi_values` – any sensitive tag is non-empty and not anonymized.
- `has_private_tags` – dataset still contains private elements.


In [18]:
# ===========================
# 5. Anonymization Checks
# ===========================

def check_anonymization(ds):
    issues = {
        "has_phi_values": False,
        "phi_tags": [],
        "has_private_tags": False,
    }

    # Sensitive tag content
    for kw in SENSITIVE_TAGS:
        val = get_tag(ds, kw)
        if val is None:
            continue
        # Flatten value for simple checks
        if isinstance(val, (list, tuple)):
            flat_vals = [str(v).strip() for v in val if v is not None]
            joined = " ".join(flat_vals)
        else:
            joined = str(val).strip()

        if joined and joined.upper() not in ANON_ALLOWED_VALUES:
            issues["has_phi_values"] = True
            issues["phi_tags"].append(kw)

    # Private tags
    if has_private_tags(ds):
        issues["has_private_tags"] = True

    return issues


## 6. Header Validation & Required Content

We validate:

1. **Global required tags** (e.g., StudyInstanceUID, Modality, Rows, Columns).
2. **Modality-specific required tags** (e.g., CT needs `KVP` in this rule set).
3. Simple basic checks (e.g., Rows/Columns > 0, Modality in allowed set).

All of this is intentionally simple – this is a *screening* script, not a full DICOM validator.


In [19]:
# ===========================
# 6. Header Validation
# ===========================

def check_required_tags(ds, tags):
    missing = []
    empty = []
    for kw in tags:
        val = get_tag(ds, kw, default=None)
        if val is None:
            missing.append(kw)
        elif is_empty_value(val):
            empty.append(kw)
    return missing, empty


def check_header_and_content(ds):
    issues = {
        "missing_global_tags": [],
        "empty_global_tags": [],
        "missing_mod_tags": [],
        "empty_mod_tags": [],
        "invalid_rows_cols": False,
        "modality_unknown": False,
    }

    # Global required tags
    missing, empty = check_required_tags(ds, GLOBAL_REQUIRED_TAGS)
    issues["missing_global_tags"] = missing
    issues["empty_global_tags"] = empty

    modality = get_tag(ds, "Modality")
    if modality is None:
        modality = "UNKNOWN"

    # Modality-specific tags
    if modality in MODALITY_REQUIRED_TAGS:
        mod_req = MODALITY_REQUIRED_TAGS[modality]
        missing_m, empty_m = check_required_tags(ds, mod_req)
        issues["missing_mod_tags"] = missing_m
        issues["empty_mod_tags"] = empty_m

    # Rows / Columns > 0
    rows = get_tag(ds, "Rows")
    cols = get_tag(ds, "Columns")
    try:
        if rows is not None and cols is not None:
            if int(rows) <= 0 or int(cols) <= 0:
                issues["invalid_rows_cols"] = True
    except Exception:
        issues["invalid_rows_cols"] = True

    # Modality sanity
    if modality not in ALLOWED_MODALITIES:
        issues["modality_unknown"] = True

    return issues


## 7. Modality-Consistent Tagging

Here we add *lightweight* modality-specific sanity checks, e.g.:

- **CT**:
  - `KVP` present and > 0 (if required in config).
- **MR**:
  - `MagneticFieldStrength` is > 0.
- **US**:
  - `PhotometricInterpretation` is one of expected values.

You can harden or relax these rules per your clinical/research context.

In [20]:
# ===========================
# 7. Modality-Consistent Checks
# ===========================

def check_modality_consistency(ds):
    modality = get_tag(ds, "Modality") or "UNKNOWN"
    issues = {
        "modality": modality,
        "ct_kvp_invalid": False,
        "mr_field_invalid": False,
        "us_photo_invalid": False,
    }

    if modality == "CT":
        kvp = get_tag(ds, "KVP")
        try:
            if kvp is None or float(kvp) <= 0:
                issues["ct_kvp_invalid"] = True
        except Exception:
            issues["ct_kvp_invalid"] = True

    if modality == "MR":
        mfs = get_tag(ds, "MagneticFieldStrength")
        try:
            if mfs is None or float(mfs) <= 0:
                issues["mr_field_invalid"] = True
        except Exception:
            issues["mr_field_invalid"] = True

    if modality == "US":
        pi = get_tag(ds, "PhotometricInterpretation")
        if pi is not None:
            pi = str(pi).upper()
            if pi not in {"MONOCHROME2", "MONOCHROME1", "RGB", "YBR_FULL"}:
                issues["us_photo_invalid"] = True

    return issues


## 8. Run All Checks & Build a Summary Table

We iterate through all discovered DICOM files and generate a per-file report with:

- File path & basic identifiers
- Anonymization issues
- Header issues
- Modality consistency issues

Then we aggregate counts to see how bad the dataset is.


In [21]:
# ===========================
# 8. Run All Checks
# ===========================

records = []

for i, path in enumerate(dicom_files):
    ds, err = safe_read_dicom(path)

    if ds is None:
        records.append({
            "file": str(path),
            "read_error": err,
            "modality": None,
            "has_phi_values": None,
            "phi_tags": None,
            "has_private_tags": None,
            "missing_global_tags": None,
            "empty_global_tags": None,
            "missing_mod_tags": None,
            "empty_mod_tags": None,
            "invalid_rows_cols": None,
            "modality_unknown": None,
            "ct_kvp_invalid": None,
            "mr_field_invalid": None,
            "us_photo_invalid": None,
        })
        continue

    modality = get_tag(ds, "Modality") or "UNKNOWN"
    anon_issues = check_anonymization(ds)
    header_issues = check_header_and_content(ds)
    mod_issues = check_modality_consistency(ds)

    records.append({
        "file": str(path),
        "read_error": None,
        "modality": modality,
        "has_phi_values": anon_issues["has_phi_values"],
        "phi_tags": ";".join(anon_issues["phi_tags"]),
        "has_private_tags": anon_issues["has_private_tags"],
        "missing_global_tags": ";".join(header_issues["missing_global_tags"]),
        "empty_global_tags": ";".join(header_issues["empty_global_tags"]),
        "missing_mod_tags": ";".join(header_issues["missing_mod_tags"]),
        "empty_mod_tags": ";".join(header_issues["empty_mod_tags"]),
        "invalid_rows_cols": header_issues["invalid_rows_cols"],
        "modality_unknown": header_issues["modality_unknown"],
        "ct_kvp_invalid": mod_issues["ct_kvp_invalid"],
        "mr_field_invalid": mod_issues["mr_field_invalid"],
        "us_photo_invalid": mod_issues["us_photo_invalid"],
    })

    if (i + 1) % 200 == 0:
        print(f"Processed {i+1} / {len(dicom_files)} files...")

df = pd.DataFrame.from_records(records)
df.head()


Processed 200 / 277 files...


Unnamed: 0,file,read_error,modality,has_phi_values,phi_tags,has_private_tags,missing_global_tags,empty_global_tags,missing_mod_tags,empty_mod_tags,invalid_rows_cols,modality_unknown,ct_kvp_invalid,mr_field_invalid,us_photo_invalid
0,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
1,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
2,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
3,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
4,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False


## 9. Dataset-Level Summary

We now aggregate the results to see:

- How many files fail anonymization.
- How many files have read errors.
- How many files miss critical tags.
- Distribution by modality.


In [22]:
# ===========================
# 9. Dataset-Level Summary
# ===========================

print("Total files:", len(df))

print("\n--- Read Errors ---")
print(df["read_error"].notnull().value_counts())

print("\n--- By Modality ---")
print(df["modality"].value_counts(dropna=False))

def count_bool(col):
    if col not in df.columns:
        return None
    return df[col].fillna(False).value_counts()

print("\n--- Anonymization ---")
print("has_phi_values:")
print(count_bool("has_phi_values"))
print("\nhas_private_tags:")
print(count_bool("has_private_tags"))

print("\n--- Header Issues ---")
for col in ["invalid_rows_cols", "modality_unknown"]:
    print(f"\n{col}:")
    print(count_bool(col))

print("\n--- Modality Consistency ---")
for col in ["ct_kvp_invalid", "mr_field_invalid", "us_photo_invalid"]:
    print(f"\n{col}:")
    print(count_bool(col))


Total files: 277

--- Read Errors ---
read_error
False    277
Name: count, dtype: int64

--- By Modality ---
modality
MR          276
RTSTRUCT      1
Name: count, dtype: int64

--- Anonymization ---
has_phi_values:
has_phi_values
True    277
Name: count, dtype: int64

has_private_tags:
has_private_tags
False    277
Name: count, dtype: int64

--- Header Issues ---

invalid_rows_cols:
invalid_rows_cols
False    277
Name: count, dtype: int64

modality_unknown:
modality_unknown
False    276
True       1
Name: count, dtype: int64

--- Modality Consistency ---

ct_kvp_invalid:
ct_kvp_invalid
False    277
Name: count, dtype: int64

mr_field_invalid:
mr_field_invalid
True     276
False      1
Name: count, dtype: int64

us_photo_invalid:
us_photo_invalid
False    277
Name: count, dtype: int64


## 10. Inspect Problematic Files

Now we extract subsets of files with issues. From here, you can:

- Download the CSV report.
- Inspect individual headers in more detail.
- Decide which files to drop or fix.


In [23]:
# ===========================
# 10. Inspect Problematic Files
# ===========================

problem_mask = (
    df["read_error"].notnull()
    | df["has_phi_values"].fillna(False)
    | df["has_private_tags"].fillna(False)
    | df["invalid_rows_cols"].fillna(False)
    | df["modality_unknown"].fillna(False)
    | df["ct_kvp_invalid"].fillna(False)
    | df["mr_field_invalid"].fillna(False)
    | df["us_photo_invalid"].fillna(False)
)

df_problems = df[problem_mask].copy()
print(f"Files with any issue: {len(df_problems)}")

df_problems.head()


Files with any issue: 277


Unnamed: 0,file,read_error,modality,has_phi_values,phi_tags,has_private_tags,missing_global_tags,empty_global_tags,missing_mod_tags,empty_mod_tags,invalid_rows_cols,modality_unknown,ct_kvp_invalid,mr_field_invalid,us_photo_invalid
0,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
1,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
2,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
3,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False
4,/kaggle/input/hippocampal-sparing-dataset/Hipp...,,MR,True,PatientName;PatientID,False,,,MagneticFieldStrength,,False,False,False,True,False


## 11. Save Report as CSV (Optional)

You can download this CSV from Kaggle and feed it into your QA workflow, Jira, or whatever defect tracking system you use.


In [24]:
# ===========================
# 11. Save CSV
# ===========================

output_path = "/kaggle/working/dicom_qc_report.csv"
df.to_csv(output_path, index=False)
print("Saved report to:", output_path)


Saved report to: /kaggle/working/dicom_qc_report.csv


## 12. Next Steps / Hardening Ideas

This script is intentionally **basic**. To make it more production-grade:

- Tie anonymization rules directly to your **DICOM profile** (CT vs MR vs RTSTRUCT etc.).
- Add **UID pattern checks** (e.g., all UIDs re-rooted under a specific anonymized root).
- Integrate with a **formal DICOM validator** for deeper attribute model checks.
- For clinical datasets, expand PHI checks to free-text fields (e.g., `StudyDescription`, `SeriesDescription`) via:
  - regex for MRNs,
  - dictionaries for hospital names, city names, etc.
- Convert this into a **Python package** and call it from:
  - CI pipelines,
  - preprocessing scripts for ML,
  - or your regulatory QA toolchain.

For now, this notebook gives you a **fast, dataset-level smoke test** for anonymization and header sanity.
