In [7]:
import os
import re
import json
import cv2
import pydicom
import numpy as np
import pandas as pd
from pydicom.pixel_data_handlers.util import apply_modality_lut, apply_voi_lut

# --------------------
# Config
# --------------------
data_directory = "../data/Data/manifest/CBIS-DDSM/"
metadata_file  = "../data/Data/manifest/all_data.csv"

train_dir = "../data/Processed Data/train"
test_dir  = "../data/Processed Data/test"
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir,  exist_ok=True)

TARGET_SIZE = (512, 512)

# --------------------
# Helpers
# --------------------
def truncate_to_view_dir(p: str) -> str | None:
    """Return path up to and including the first CC/ or MLO/ directory (case-insensitive)."""
    if not isinstance(p, str):
        return None
    p = p.replace("\\", "/").strip()
    m = re.search(r'/(CC|MLO)(/|$)', p, flags=re.I)
    if not m:
        return None
    end = m.end()
    out = p[:end]
    if not out.endswith("/"):
        out = out + "/"
    return out

def get_dicom_files(directory_path: str) -> list[str]:
    """
    Return a sorted list of .dcm files directly under 'directory_path' (non-recursive).
    Accepts absolute path or relative to data_directory.
    """
    cand1 = os.path.normpath(directory_path)
    cand2 = os.path.normpath(os.path.join(data_directory, directory_path))
    root = cand1 if os.path.isdir(cand1) else (cand2 if os.path.isdir(cand2) else None)
    if root is None:
        return []
    files = [os.path.join(root, f) for f in os.listdir(root) if f.lower().endswith(".dcm")]
    files.sort()
    return files

def get_pixel_data(path: str, size=TARGET_SIZE) -> np.ndarray:
    """Read DICOM, apply Modality LUT -> VOI LUT, resize, return float32 array."""
    ds = pydicom.dcmread(path, force=True)
    arr = apply_modality_lut(ds.pixel_array, ds)   # rescale slope/intercept etc.
    arr = apply_voi_lut(arr, ds)                   # windowing
    arr = arr.astype(np.float32, copy=False)
    arr = cv2.resize(arr, size, interpolation=cv2.INTER_AREA)
    return arr

# --------------------
# Load & prepare metadata
# --------------------
metadata = pd.read_csv(metadata_file)

# Robust label mapping
label_mapping = {"BENIGN": 0, "BENIGN_WITHOUT_CALLBACK": 0, "MALIGNANT": 1}
metadata["label"] = (
    metadata["pathology"]
    .astype(str).str.strip().str.upper()
    .map(label_mapping)
)

# Normalize paths; build truncated CC/MLO dir path
metadata["image file path"] = (
    metadata["image file path"]
    .astype(str).str.replace("\\", "/", regex=False).str.strip()
)
metadata["truncated_path"] = metadata["image file path"].apply(truncate_to_view_dir)

# Keep rows that matched CC/MLO and normalize view names
metadata = metadata.dropna(subset=["truncated_path"]).copy()
metadata["image view"] = metadata["image view"].astype(str).str.strip().str.upper()

# --------------------
# Group and process
# --------------------
grouped = metadata.groupby(["patient_id", "left or right breast"])

train_objects, test_objects = [], []
saved, skipped = 0, 0

for (patient_id, laterality), group in grouped:
    views = group.set_index("image view")

    try:
        # CC/MLO directories (handle multiple rows deterministically by taking first)
        cc_path = views.loc["CC", "truncated_path"]
        if isinstance(cc_path, pd.Series):
            cc_path = cc_path.iloc[0]
        mlo_path = views.loc["MLO", "truncated_path"]
        if isinstance(mlo_path, pd.Series):
            mlo_path = mlo_path.iloc[0]

        label = int(views.iloc[0]["label"])

        # Decide split from original paths (case-insensitive; default to train)
        cc_low, mlo_low = cc_path.lower(), mlo_path.lower()
        if "test" in cc_low or "test" in mlo_low:
            save_dir, meta_list, split = test_dir, test_objects, "test"
        else:
            save_dir, meta_list, split = train_dir, train_objects, "train"

        # Get first DICOM file in each CC/MLO dir (non-recursive)
        cc_list = get_dicom_files(cc_path)
        mlo_list = get_dicom_files(mlo_path)
        if not cc_list or not mlo_list:
            skipped += 1
            continue

        cc_file = cc_list[0]
        mlo_file = mlo_list[0]

        # Load pixels and stack channels LAST: (H, W, 2)
        cc_img  = get_pixel_data(cc_file)
        mlo_img = get_pixel_data(mlo_file)
        tensor = np.stack([cc_img, mlo_img], axis=-1).astype(np.float32)

        # Save tensor
        save_path = os.path.join(save_dir, f"{patient_id}_{laterality}.npy")
        np.save(save_path, tensor)

        # Record metadata
        meta_list.append({
            "patient_id": patient_id,
            "laterality": laterality,
            "processed_path": save_path,
            "label": label,
            "split": split
        })
        saved += 1

    except Exception:
        skipped += 1
        continue

# --------------------
# Write meta files
# --------------------
with open("../data/meta_train.json", "w") as f:
    json.dump(train_objects, f)
with open("../data/meta_test.json", "w") as f:
    json.dump(test_objects, f)

print(f"Saved tensors: {saved} | Skipped: {skipped}")
print(f"Train meta: {len(train_objects)} | Test meta: {len(test_objects)}")


Saved tensors: 0 | Skipped: 0
Train meta: 0 | Test meta: 0
