In [None]:
import os
import re
import pandas as pd
import numpy as np
import gzip
import pickle
import pydicom

In [None]:
# INTERNAL USE: Load pkl into a single dataframe
def get_load_pkl(folder_path):
    dfs = []
    for subfolder in os.listdir(folder_path):
        subfolder_path = os.path.join(folder_path, subfolder)
        if os.path.isdir(subfolder_path):
            file_path = os.path.join(subfolder_path, f"{subfolder}.pkl.gz")
            if os.path.exists(file_path):
                with gzip.open(file_path, "rb") as f:
                    data = pickle.load(f)
                if isinstance(data, dict) or isinstance(data, list):
                    dfs.append(pd.DataFrame(data))

    return pd.concat(dfs, ignore_index=True)

In [None]:
# INTERNAL USE: Replace DICOM tag numbers with labels
def get_tag_name(tag):
    if isinstance(tag, pydicom.tag.BaseTag):
        try:
            keyword = pydicom.datadict.keyword_for_tag(tag)
            return keyword if keyword else str(tag)
        except:
            return str(tag)
    else:
        return tag

In [None]:
# INTERNAL USE: Convert DICOM value to standard data types
def convert_dcm_value(x):
    if isinstance(x, pydicom.dataelem.DataElement):
        value = x.value
    else:
        value = x

    if isinstance(value, pydicom.multival.MultiValue):
        return tuple(float(v) if isinstance(v, pydicom.valuerep.DSfloat) else v for v in value)
    elif isinstance(value, pydicom.valuerep.DSfloat):
        return float(value)
    else:
        return value

In [None]:
# INTERNAL USE: Clean data types
def clean_dcm_column(colname, series):
    if colname in {"ScanningSequence", "SequenceVariant"}:
        return series.apply(lambda x: (x,) if not isinstance(x, tuple) else x)
    elif colname == "ImageType":
        return series.apply(lambda x: set(x) if isinstance(x, (list, tuple, str)) else set())
    elif colname == "InversionTime":
        series = series.apply(lambda x: float(x) if isinstance(x, pydicom.valuerep.DSfloat) else x)
        return pd.to_numeric(series, errors="coerce")
    return series

In [None]:
# INTERNAL USE: Remove potential report images
def remove_report_images(df):
    report_keywords = ["mr_brain", "screen", "docs", "document", "paperwork", "pdf", "report"]
    report_manufacturers = ["PACS", "LEXMARK", "Hyland", "FUJITSU", "Viztek", "Canon", "Sorna", "Altamont", "INTELETAD", "Carestream"]

    report_mask = (
        (df["Sequence"].str.contains("|".join(report_keywords), case=False, na=False)) |
        (df["Manufacturer"].str.contains("|".join(report_manufacturers), case=False, na=False)) |
        (df["ImageType"].apply(
            lambda items: any(any(kw in str(item).lower() for kw in ["screen", "scan", "doc"])
                              for item in items)
        )) |
        (
            df["ImageType"].apply(lambda items: any("secondary" in str(item).lower() for item in items)) &
            df["Manufacturer"].isna() &
            (df["Modality"] == "MR")
        )
    )

    return df[~report_mask]

In [None]:
# INTERNAL USE: Clean sequence column
def clean_sequence_column(df):
    df["Sequence"] = df["Sequence"].str.replace(r"^\d+_+", "", regex=True)
    df["Sequence"] = df["Sequence"].str.lower()

    return df

In [None]:
# Identify MRI sequence name based on DICOM metadata
def identify_mri_sequence(df):
    def to_set(x):
        if not isinstance(x, (list, tuple, set, np.ndarray)) and pd.isna(x):
            return set()
        elif isinstance(x, (list, tuple, set)):
            flat = []
            for item in x:
                if isinstance(item, (list, tuple, set)):
                    flat.extend(item)
                else:
                    flat.append(item)
            return set(map(str.upper, map(str, flat)))
        else:
            return set(map(str.upper, str(x).replace(" ", "").split("\\")))

    def safe_get(row, key):
        return row.get(key, np.nan)

    def classify(row):
        scan_seq_set = to_set(row["ScanningSequence"])
        seq_var_set = to_set(row["SequenceVariant"])
        img_type_set = to_set(row["ImageType"])

        te = safe_get(row, "EchoTime")
        tr = safe_get(row, "RepetitionTime")
        fa = safe_get(row, "FlipAngle")
        ti = safe_get(row, "InversionTime")
        bval = safe_get(row, "DiffusionBValue")
        dim = safe_get(row, "MRAcquisitionType")
        etl = safe_get(row, "EchoTrainLength")
        
        #  Identify from common labels in ImageType
        if any("ADC" in item.upper() for item in img_type_set):
            return "DWI", "ADC"
        if "TRACEW" in img_type_set:
            return "DWI", "TRACEW"
        if any("FA" in item.upper() and "FAT" not in item.upper() for item in img_type_set):
            return "DWI", "FA"
        if any("ISO" in item.upper() for item in img_type_set):
            return "DWI", "ISO"
        if "EXP" in img_type_set:
            return "DWI", "EXP"
        if any("DIFF" in item.upper() or "DWI" in item.upper() for item in img_type_set) or "DFC" in img_type_set or (bval and bval > 0):
            if bval and bval == 0:
                return "DWI", "b0"
            else:
                return "DWI", np.nan
        
        if any("ASL" in item.upper() for item in img_type_set):
            return "PWI", "ASL"
        if any("PERFUSION" in item.upper() for item in img_type_set) or "CBF" in img_type_set:
            return "PWI", np.nan
        
        if any("SW" in item.upper() for item in img_type_set):
            return "SWI", np.nan

        if "TOF" in img_type_set or "ANGIO" in img_type_set or any("MIP" in item.upper() for item in img_type_set):
            return "MRA", "TOF"
        
        if "FMRI" in img_type_set:
            return "fMRI", np.nan
        
        if any("SPECTR" in item.upper() for item in img_type_set):
            return "MRS", np.nan

        # Idenfify from pulse sequence
        if "EP" in scan_seq_set:
            if 25 <= te <= 50 and 1000 <= tr <= 3000 and etl >= 30:
                return "fMRI", np.nan
            elif 20 <= te <= 40 and tr <= 100:
                return "SWI", np.nan
            elif te < 60 and tr > 1000 and etl > 1:
                return "PWI", np.nan
            elif te >= 40 and tr >= 1800:
                return "DWI", np.nan
            else:
                return "EPI Unknown", np.nan
        else:
            if "SE" in scan_seq_set or "RM" in scan_seq_set or ("GR" not in scan_seq_set and fa >= 90):
                if "IR" in scan_seq_set or (isinstance(ti, (int, float)) and not np.isnan(ti) and ti > 0):
                    if ti >= 1200 and tr >= 2000 and te >= 60:
                        return "T2", "FLAIR"
                    elif 400 <= ti < 1800 and tr >= 450:
                        return "T1", "IR"
                    elif ti <= 400 and tr >= 1000:
                        return "T2", "STIR"
                    elif dim == "2D":
                        return "IR Unknown", np.nan
                else:
                    if te <= 25 and tr <= 1500:
                        return "T1", "SE"
                    elif te >= 30 and tr >= 800:
                        return "T2", "SE"
                    elif te <= 25 and tr > 1500:
                        return "PD", "SE"
                    else:
                        return "SE Unknown", np.nan
            
            elif "GR" in scan_seq_set:
                if dim == "3D" and "MP" in seq_var_set:
                    if 300 <= ti <= 3000 and 1000 <= tr <= 5000:
                        return "T1", "MPRAGE"
                if te < 10 and tr <= 500:
                    if "SP" in seq_var_set and "IR" in scan_seq_set or (isinstance(ti, (int, float)) and not np.isnan(ti) and ti > 0):
                        return "T1", "SPGR"
                    elif 30 < fa <= 90 and tr < 15:
                        return "bSSFP", np.nan
                    elif te <= 9 and tr <= 40:
                        return "MRA", "TOF"
                    else:
                        return "T1", "GRE"
                if te >= 10 and 120 <= tr <= 1600:
                    return "T2star", "GRE"
                if dim == "3D" and fa <= 40 and te < 55 and tr < 120:
                    return "SWI", np.nan
                return "GRE Unknown", np.nan
            
        return "Unknown", np.nan

    df[["IdentifiedSequenceName", "IdentifiedSequenceAcquisition"]] = df.apply(classify, axis=1, result_type="expand")
    df.loc[(df["SeriesNumber"] == 1) & (df["IdentifiedSequenceName"].str.contains("Unknown")), "IdentifiedSequenceName"] = "Localizer"
    df.loc[df["Sequence"].str.contains(r"loc|scout|aahead|cal|scano|3_pl|3pl|survey", case=False, na=False), ["IdentifiedSequenceName", "IdentifiedSequenceAcquisition"]] = ["Localizer", np.nan]

    return df

INTERNAL USE: (OPTIONAL) Update identified MRI sequence name (SWI, PWI, MRA) based on DICOM sequence name

In [None]:
simplified_keywords = {
    "angio": "MRA",
    "aorta": "MRA TOF",
    "arter": "MRA TOF",
    "asl": "PWI ASL",
    "car": "MRA TOF",
    "cca": "MRA TOF",
    "cbf": "PWI DSC",
    "cbv": "PWI DSC",
    "cow": "MRA TOF",
    "dce": "PWI DCE",
    "dsc": "PWI DSC",
    "grasp": "PWI DSC",
    "ica": "MRA TOF",
    "mra": "MRA",
    "mtt": "PWI DSC",
    "perf": "PWI",
    "sus": "SWI",
    "swan min ip": "SWI mIP",
    "swan": "SWI",
    "swi pha": "SWI Pha",
    "swi": "SWI",
    "tof": "MRA TOF",
    "ttp": "PWI DSC",
    "vess": "MRA TOF"
}

In [None]:
localizer_keywords = {"loc", "scout", "head", "cal", "scano", "3_pl", "3pl", "survey"}

In [None]:
regex_keywords = {
    r"b[ _]?\d{3,4}": "DWI"  # Matches "b1000", "b_800", "b 500", etc.
}

In [None]:
# INTERNAL USE: Capture SWI, PWI, MRA, DWI (regex only) keywords and assign the sequence name
def map_sequence_from_text_simp(text):
    if pd.isna(text):
        return None

    normalized_text = re.sub(r'[_\W]+', ' ', text.lower())
    tokens = normalized_text.split()

    matched_labels = {}

    if any(tok == "mra" for tok in tokens) and not any(tok.startswith("mrage") for tok in tokens):
        matched_labels["MRA"] = 3

    if any(kw in normalized_text for kw in localizer_keywords):
        return None

    for keyword, label in simplified_keywords.items():
        keyword_parts = keyword.lower().split()
        if all(any(part in token for token in tokens) for part in keyword_parts):
            matched_labels[label] = len(keyword)

    for pattern, label in regex_keywords.items():
        if re.search(pattern, text.lower()):
            matched_labels[label] = len(pattern)

    if not matched_labels:
        return None

    parent_to_labels = {}
    for label in matched_labels:
        parent = label.split()[0]
        parent_to_labels.setdefault(parent, []).append(label)

    final_labels = set()

    for parent, labels in parent_to_labels.items():
        parent_in_labels = parent in labels
        sublabels = [l for l in labels if l != parent]

        if len(sublabels) > 1:
            # Conflict acq: collapse to seq only
            final_labels.add(parent)
        elif parent_in_labels and len(sublabels) == 1:
            # Seq only + seq and acq: keep seq and acq
            final_labels.add(sublabels[0])
        else:
            # Conflict seq: keep all seq and acq
            final_labels.update(labels)

    return " ".join(sorted(final_labels)) if final_labels else None

In [None]:
# INTERNAL USE: Check match between parameter-based and keyword-based sequence names
def labels_match_simp(param_label, name_label):
    if not param_label or not name_label:
        return pd.NA
    
    def normalize(label):
        return set(label.upper().split())
    
    param_parts = normalize(param_label)
    name_parts = normalize(name_label)

    return (
        param_parts <= name_parts
        or name_parts <= param_parts
        or bool(param_parts & name_parts)
    )

In [None]:
# INTERNAL USE: Overwrite unmatched keyword-based sequence names
def update_seq_name(df):
    def split_keyword(keyword):
        parts = keyword.strip().split()
        if len(parts) == 1:
            return parts[0], np.nan
        return parts[0], " ".join(parts[1:])

    mask = (
        (df["SeqMatchSimp"] == False)
        & (df["SequenceKeywordSimp"].str.count(" ") <= 1)
    )

    for idx in df[mask].index:
        keyword = df.at[idx, "SequenceKeywordSimp"]
        scan_seq = df.at[idx, "ScanningSequence"]

        parent, sub = split_keyword(keyword)

        if "DWI" in keyword:
            df.at[idx, "IdentifiedSequenceName"] = "DWI"
            df.at[idx, "IdentifiedSequenceAcquisition"] = sub if sub else np.nan
        elif isinstance(scan_seq, tuple) and scan_seq != ("SE",):
            df.at[idx, "IdentifiedSequenceName"] = parent
            df.at[idx, "IdentifiedSequenceAcquisition"] = sub if sub else np.nan

    return df

In [None]:
# INTERNAL USE: Pipline for updating SWI, PWI, MRA, DWI sequence name based on Sequence keywords
def apply_keywords_mapping_pipeline(df):
    
    def format_identified_sequence(row):
        if pd.notna(row["IdentifiedSequenceAcquisition"]):
            return f"{row['IdentifiedSequenceName']} {row['IdentifiedSequenceAcquisition']}"
        return row["IdentifiedSequenceName"]

    df["SequenceKeywordSimp"] = df["Sequence"].apply(map_sequence_from_text_simp)

    df["SeqMatchSimp"] = df.apply(
        lambda row: labels_match_simp(
            format_identified_sequence(row),
            row["SequenceKeywordSimp"]
        ),
        axis=1
    )

    df = update_seq_name(df)

    df["IdentifiedSequence_new"] = df.apply(format_identified_sequence, axis=1)

    return df

INTERNAL USE: Example usage

In [None]:
path_mgb = "/your/path/to/metadata/MGB"

In [None]:
metadata_mgb = get_load_pkl(path_mgb)

In [None]:
metadata_mgb.columns = [get_tag_name(col) for col in metadata_mgb.columns]

In [None]:
metadata_mgb = metadata_mgb.map(convert_dcm_value)

In [None]:
for col in metadata_mgb.columns:
    metadata_mgb[col] = clean_dcm_column(col, metadata_mgb[col])

In [None]:
metadata_mgb = metadata_mgb.loc[:, ~metadata_mgb.columns.duplicated()]  # Remove duplicate Modality column

In [None]:
metadata_mgb_cleaned = remove_report_images(metadata_mgb)

In [None]:
mri_mgb = metadata_mgb_cleaned[metadata_mgb_cleaned["Modality"] == "MR"].reset_index(drop=True)

In [None]:
mri_mgb = clean_sequence_column(mri_mgb)

In [None]:
# Main MRI sequence identification function call
mri_mgb = identify_mri_sequence(mri_mgb)

In [None]:
# Update SWI, PWI, MRA, DWI sequence name
mri_mgb = apply_keywords_mapping_pipeline(mri_mgb)