# MRI + Annotation ETL Pipeline 

In [10]:
# ======================================
# 1️⃣ Setup and Configuration 
# --> TO BE UPDATE in LOCAL RUN
# ======================================
import os, json, pandas as pd, pydicom
from pathlib import Path

BASE_DIR = Path("~/Downloads/01_MRI_Data").expanduser()   # Folder containing patient subfolders
ANNOTATION_FILE = Path("~/Downloads/DataEngineering-MRI/Radiologists Report.xlsx").expanduser()
OUTPUT_DIR = Path("~/Downloads/DataEngineering-MRI").expanduser()
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
S3_PATH = "s3://spine-mri-public-data/raw/mri"
print(f'[INFO] Base folder: {BASE_DIR}')
print(f'[INFO] Output folder: {OUTPUT_DIR}')

[INFO] Base folder: /Users/I758899/Downloads/01_MRI_Data
[INFO] Output folder: /Users/I758899/Downloads/DataEngineering-MRI


## 2️⃣ Main Functions
Extract metadata from MRI files and process the Excel annotation file.

In [32]:
def safe_float(value, default=0.0):
    try:
        return float(value)
    except (TypeError, ValueError):
        return default
def extract_dicom_metadata(file_path: Path):
    try:
        dcm = pydicom.dcmread(str(file_path), stop_before_pixels=True)
        slice_thickness = safe_float(dcm.get("SliceThickness"))
        pixel_spacing_raw = dcm.get("PixelSpacing", [0.0, 0.0])
        pixel_spacing = [safe_float(x) for x in pixel_spacing_raw]
        relative_path = file_path.relative_to(BASE_DIR)
        s3_path = f"{S3_PATH}/{relative_path.as_posix()}"
        meta = {
            "patient_id": str(dcm.get("PatientID", "")),
            "study_id": str(dcm.get("StudyID", "")),
            "series_id": str(dcm.get("SeriesInstanceUID", "")),
            "study_date": str(dcm.get("StudyDate", "")),
            "slice_thickness": slice_thickness,
            "pixel_spacing": pixel_spacing,
            "orientation": str(dcm.get("ImageOrientationPatient", "")),
            "manufacturer": str(dcm.get("Manufacturer", "")),
            "modality": str(dcm.get("Modality", "")),
            "file_name": file_path.name,
            "file_path": str(s3_path)
        }
        return meta

    except Exception as e:
        print(f"[WARN] Failed to read {file_path}: {e}")
        return None

def scan_mri_directory(base_dir: Path):
    results = []
    print(f'scanning mri...')

    for patient_dir in base_dir.glob('*/'):
        patient_id = patient_dir.name
        print(f'Student ID: {patient_id}')
        for study_dir in patient_dir.glob('*/'):
            study_id = study_dir.name
            for root, dirs, files in os.walk(study_dir):
                for f in files:
                    if f.lower().endswith('.ima'):
                        fpath = Path(root) / f
                        meta = extract_dicom_metadata(fpath)
                        if meta:
                            meta.update({'patient_id': patient_id, 'study_id': study_id})
                            results.append(meta)
    return results

def load_annotations(file_path: Path):
    df = pd.read_excel(file_path)
    df.columns = (
        df.columns
        .str.strip()
        .str.lower()
        .str.replace(" ", "_")
        .str.replace("'", "")
        .str.replace("’", "")
    )

    note_col = next((c for c in df.columns if "note" in c), None)
    if not note_col:
        print("[WARN] Could not find a column containing 'note'.")
        print(f"Available columns: {df.columns.tolist()}")
        return []

    # Normalize IDs and fill missing
    df["patient_id"] = df["patient_id"].astype(str).str.zfill(4)
    df = df.fillna("")

    annotations = []
    for pid, group in df.groupby("patient_id", dropna=False):
        notes = list(group[note_col].dropna().unique())
        entry = {
            "patient_id": pid,
            "annotations": [{"notes": n.strip()} for n in notes if n.strip()]
        }
        annotations.append(entry)
    return annotations

## 3️⃣ Extract MRI Metadata


In [33]:
print('[INFO] Scanning MRI folder...')
mri_records = scan_mri_directory(BASE_DIR)
print(f'[INFO] Extracted metadata for {len(mri_records)} slices.')

with open(OUTPUT_DIR / 'mri_metadata.json', 'w') as f:
    json.dump(mri_records, f, indent=2)
print('[SUCCESS] MRI metadata saved!')

[INFO] Scanning MRI folder...
scanning mri...
Student ID: 0389
Student ID: 0184
Student ID: 0342
Student ID: 0170
Student ID: 0514
Student ID: 0148
Student ID: 0177
Student ID: 0345
Student ID: 0183
Student ID: 0373
Student ID: 0525
Student ID: 0387
Student ID: 0179
Student ID: 0380
Student ID: 0522
Student ID: 0146
Student ID: 0112
Student ID: 0320
Student ID: 0549
Student ID: 0115
Student ID: 0571
Student ID: 0318
Student ID: 0547
Student ID: 0311
Student ID: 0123
Student ID: 0124
Student ID: 0316
Student ID: 0540
Student ID: 0329
Student ID: 0178
Student ID: 0523
Student ID: 0147
Student ID: 0375
Student ID: 0381
Student ID: 0386
Student ID: 0372
Student ID: 0140
Student ID: 0524
Student ID: 0149
Student ID: 0512
Student ID: 0344
Student ID: 0176
Student ID: 0388
Student ID: 0171
Student ID: 0343
Student ID: 0515
Student ID: 0185
Student ID: 0317
Student ID: 0125
Student ID: 0541
Student ID: 0546
Student ID: 0122
Student ID: 0310
Student ID: 0114
Student ID: 0326
Student ID: 0570
St

## 4️⃣ Process Radiologist Annotations

In [22]:
# Check the annotations file
import pandas as pd
df = pd.read_excel(ANNOTATION_FILE)
print(df.head())
print(df.columns)
print(df.info())

   Patient ID                                  Clinician's Notes
0           1  L4-5: degenerative annular disc bulge is noted...
1           2  No evidence of disc herniation.\nNo significan...
2           3  LSS MRI\nFeatures of muscle spasm.\nsmall cent...
3           4  Feature of muscle spasm.\nDiffuse disc bulges ...
4           5  LSS MRI :\nFeature of muscle spasm.\nDiffuse d...
Index(['Patient ID', 'Clinician's Notes'], dtype='object')
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 575 entries, 0 to 574
Data columns (total 2 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   Patient ID         575 non-null    int64 
 1   Clinician's Notes  515 non-null    object
dtypes: int64(1), object(1)
memory usage: 9.1+ KB
None


In [34]:
print('[INFO] Processing annotation Excel file...')
annotations = load_annotations(ANNOTATION_FILE)
print(f'[INFO] Processed {len(annotations)} patient annotation groups.')

with open(OUTPUT_DIR / 'annotations.json', 'w') as f:
    json.dump(annotations, f, indent=2)
print('[SUCCESS] Annotations saved!')

[INFO] Processing annotation Excel file...
[INFO] Processed 575 patient annotation groups.
[SUCCESS] Annotations saved!


## 5️⃣ Merge and Validate

In [35]:
print('[INFO] Merging annotations with MRI metadata...')
merged = []
for mri in mri_records:
    matched = next((a for a in annotations if a['patient_id'] == mri['patient_id']), None)
    record = {**mri, 'annotations': matched['annotations'] if matched else []}
    merged.append(record)

with open(OUTPUT_DIR / 'cleaned_mri_data.json', 'w') as f:
    json.dump(merged, f, indent=2)
print(f'[SUCCESS] All outputs saved under {OUTPUT_DIR}')

[INFO] Merging annotations with MRI metadata...
[SUCCESS] All outputs saved under /Users/I758899/Downloads/DataEngineering-MRI
