# Lunit Study Screening Label Assignment v6.2
### Beatrice Brown-Mulry
### 06/16/2025

In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import matplotlib.pyplot as plt
import embed_toolkit  # version 0.2.*
from typing import Optional, Union
import bisect
from dataclasses import dataclass, field

pd.set_option("display.max_rows", 50)
pd.set_option("display.max_columns", 500)
pd.options.mode.chained_assignment = None

tqdm.pandas()  # initialize tqdm wrapper for pandas.apply

In [None]:
MAGVIEW_PATH = "/mnt/NAS3/mammo/lunit/data/standalone_studies_clairity_anon.csv"
SCORE_PATH = "../../data/dbt_cohort_scores_v2.csv"
ADDENDUMS_PATH = "/mnt/NAS3/mammo/tables/embedv2_extract_current_anon/Addendums_anon.csv"

In [None]:
# init embed params object
embed_params = embed_toolkit.EMBEDParameters()

---

# 0. Data Preparation

## 0.1 Prepare Magview

In [None]:
# load dataframe
mag_df = pd.read_csv(MAGVIEW_PATH)
mag_df = mag_df.dropna(subset="desc")

# ensure key columns have the correct data types
mag_df["empi_anon"] = pd.to_numeric(mag_df["empi_anon"])
mag_df["acc_anon"] = pd.to_numeric(mag_df["acc_anon"])
mag_df["study_date_anon"] = pd.to_datetime(mag_df["study_date_anon"])

# drop existing finding type cols so we can re-derive them
mag_drop_cols = ["arch_distortion", "asymmetry", "calc", "mass"]
mag_df = mag_df.drop(columns=mag_drop_cols)

# exclude invalid accessions (coded as 999)
mag_df = mag_df[mag_df.acc_anon != 999]

# get subset of magview corresponding to screen exams
mag_scr = mag_df.loc[mag_df.desc.str.contains("scr", case=False)]

# summarize dataframe contents
mag_scr.embed.summarize("Screen Magview")

In [None]:
# get subset of magview corresponding to diagnostic exams
mag_diag = mag_df.loc[
    mag_df.desc.str.contains("diag", case=False) | mag_df.desc.str.contains("US")
]

# summarize dataframe contents
mag_diag.embed.summarize("Diagnostic Magview")

## 0.2 Correct Contralaterals
We need to run the contralateral correction function on our Magview dataframe to create entries for any negative contralateral findings that are implied on bilateral exams with single-sided unilateral findings.

In [None]:
# apply contralateral correction
mag_scr_contra = embed_toolkit.correct_contralaterals(mag_scr)

# correct column dtypes
mag_scr_contra["study_date_anon"] = pd.to_datetime(mag_scr_contra["study_date_anon"])
mag_scr_contra["acc_anon"] = pd.to_numeric(mag_scr_contra["acc_anon"])
mag_scr_contra["empi_anon"] = pd.to_numeric(mag_scr_contra["empi_anon"])

In [None]:
# convert any 'B' findings to separate 'L'/'R' findings
new_rows = []

for _, row in mag_scr_contra.iterrows():
    if row["side"] == "B":
        row_L = row.copy()
        row_L["side"] = "L"
        row_R = row.copy()
        row_R["side"] = "R"
        new_rows.extend([row_L, row_R])
    else:
        new_rows.append(row)

# create a new DataFrame from the expanded rows
mag_scr_contra = pd.DataFrame(new_rows).reset_index(drop=True)

In [None]:
# create a new acc_lat identifier to track labels per exam side
mag_scr_contra["acc_lat"] = (
    mag_scr_contra["acc_anon"].astype(str) + "_" + mag_scr_contra["side"].astype(str)
)

In [None]:
# apply contralateral correction
mag_diag_contra = embed_toolkit.correct_contralaterals(mag_diag)

# correct column dtypes
mag_diag_contra["study_date_anon"] = pd.to_datetime(mag_diag_contra["study_date_anon"])
mag_diag_contra["acc_anon"] = pd.to_numeric(mag_diag_contra["acc_anon"])
mag_diag_contra["empi_anon"] = pd.to_numeric(mag_diag_contra["empi_anon"])

In [None]:
# convert any 'B' findings to separate 'L'/'R' findings
new_rows = []

for _, row in mag_diag_contra.iterrows():
    if row["side"] == "B":
        row_L = row.copy()
        row_L["side"] = "L"
        row_R = row.copy()
        row_R["side"] = "R"
        new_rows.extend([row_L, row_R])
    else:
        new_rows.append(row)

# create a new DataFrame from the expanded rows
mag_diag_contra = pd.DataFrame(new_rows).reset_index(drop=True)

In [None]:
# create a new acc_lat identifier to track labels per exam side
mag_diag_contra["acc_lat"] = (
    mag_diag_contra["acc_anon"].astype(str) + "_" + mag_diag_contra["side"].astype(str)
)

## 0.3 Handle Addended Exam Data

In [None]:
# finding matching algorithm -------------------------------------------------------------
class MatchCandidate:
    def __init__(self, base_row: pd.Series, target_row: pd.Series):
        # get the list of columns shared by both
        self.cols: list[str] = list(
            set(base_row.keys()).intersection(set(target_row.keys()))
        )

        # extract the relevant columns from base and target
        self.base: dict[str, any] = self._clean(base_row[self.cols])
        self.target: dict[str, any] = self._clean(target_row[self.cols])

        # reserve matching dict for later
        self.matches: dict[str, bool] = dict()

    def evaluate(self) -> float:
        self.matches: dict[str, bool] = dict()

        for k in self.cols:
            base_v: any = self.base[k]
            target_v: any = self.target[k]
            is_match: bool = base_v == target_v

            # evaluate matches
            self.matches[k] = is_match

        n_matches: int = sum(list(self.matches.values()))

        return n_matches / len(self.matches)

    def get_mismatches(self) -> dict[str, dict[str, any]]:
        mismatches: dict[str, dict[str, any]] = dict()

        if len(self.matches) == 0:
            _ = self.evaluate()

        mismatch_keys: list[str] = [k for k, v in self.matches.items() if v == False]
        for k in mismatch_keys:
            mismatches[k] = {"base": self.base[k], "target": self.target[k]}

        return mismatches

    def _clean(self, data: pd.Series) -> dict[str, any]:
        # init output dict
        clean_data: dict[str, any] = dict()

        # iterate over raw keys/values
        for k, v in data.items():
            str_k: str = str(k)
            # handle nans
            if pd.isnull(v):
                clean_data[str_k] = "nan"
                continue

            # otherwise, cast to string and handle
            # (also catches values that were already strings)
            else:
                str_v: str = str(v).strip()

                # treat empty strings like nans
                if len(str_v) == 0:
                    clean_data[str_k] = "nan"
                    continue

                else:
                    float_v: Union[float, bool] = is_str_number(str_v)
                    # print(f"{str_k}: {float_v}")
                    if isinstance(float_v, float):
                        clean_data[str_k] = float_v
                        continue

                    else:
                        # otherwise write the string to the output
                        clean_data[str_k] = str_v
                        continue

        return clean_data


# ------------------------------------------------------------------------------


# matching scores data structure -----------------------------------------------
@dataclass(order=True)
class ScoredItem:
    # negative score for max-heap like sorting (highest score first)
    sort_index: float = field(init=False, repr=False)
    score: float
    source_num: int
    add_idx: int

    def __post_init__(self):
        # sort by negative score so highest score comes first
        self.sort_index = -self.score


class ScoredIndexStore:
    def __init__(self):
        self.items: list[ScoredItem] = []
        self.by_source_num: dict[int, list[ScoredItem]] = {}
        self.by_add_idx: dict[int, list[ScoredItem]] = {}

    def add(self, source_num: int, add_idx: int, score: float):
        item = ScoredItem(score=score, source_num=source_num, add_idx=add_idx)
        bisect.insort(self.items, item)
        self.by_source_num.setdefault(source_num, []).append(item)
        self.by_add_idx.setdefault(add_idx, []).append(item)

    def filter_by_source_num(self, source_num: int) -> list[ScoredItem]:
        return self.by_source_num.get(source_num, [])

    def filter_by_add_idx(self, add_idx: int) -> list[ScoredItem]:
        return self.by_add_idx.get(add_idx, [])

    def remove_by_source_num(self, source_num: int):
        # get items associated with the source num
        to_remove = self.by_source_num.pop(source_num, [])
        for item in to_remove:
            # remove each item from the main item list
            self.items.remove(item)

            # remove related items from the by_add_idx dict
            self.by_add_idx[item.add_idx].remove(item)
            if not self.by_add_idx[item.add_idx]:
                del self.by_add_idx[item.add_idx]

    def remove_by_add_idx(self, add_idx: int):
        # get items associated with the addendum num
        to_remove = self.by_add_idx.pop(add_idx, [])
        for item in to_remove:
            # remove each item from the main item list
            self.items.remove(item)

            # remove related items from the by_source_num dict
            self.by_source_num[item.source_num].remove(item)
            if not self.by_source_num[item.source_num]:
                del self.by_source_num[item.source_num]

    def get_sorted_items(self) -> list[ScoredItem]:
        return list(self.items)


# ------------------------------------------------------------------------------


# mapping data structures ------------------------------------------------------
@dataclass
class MappingPair:
    source: Optional[int]  # source row index
    addendum: Optional[int]  # addendum row index
    append: bool = False  # can also be inferred by a None index


@dataclass
class AddendumMapping:
    acc_lat: str  # exam access
    mappings: list[MappingPair] = field(default_factory=list)

    def append(self, item: MappingPair):
        self.mappings.append(item)


# ------------------------------------------------------------------------------


# match breast-level slices ----------------------------------------------------
def match_slices(
    acc_lat: str, mag_slice: pd.DataFrame, add_slice: pd.DataFrame
) -> AddendumMapping:
    # init struct to track output mappings
    mappings: AddendumMapping = AddendumMapping(acc_lat)

    # get lists to track our indexes

    mag_numfind_idxs: dict[int, list[int]] = dict()
    for numfind, group in mag_slice.groupby("numfind"):  # type: ignore
        idxs: list[int] = group.index.tolist()
        mag_numfind_idxs[numfind] = idxs  # type: ignore

    # mag_numfinds: list[int] = mag_slice.numfind.astype(int).unique().tolist()  # type: ignore
    add_indexes: list[int] = add_slice.index.tolist()

    # handle special cases that don't require matching
    # 0:n
    if len(mag_numfind_idxs) == 0:
        for add_idx in add_indexes:
            mappings.append(MappingPair(source=None, addendum=add_idx, append=True))
        return mappings

    # 1:1
    elif (len(mag_numfind_idxs) == 1) & (len(add_indexes) == 1):
        # get the single numfind and retrieve its associated idxs
        numfind: int = list(mag_numfind_idxs.keys())[0]
        idxs: list[int] = mag_numfind_idxs[numfind]

        # add them to the mappings object
        for source_idx in idxs:
            mappings.append(MappingPair(source=source_idx, addendum=add_indexes[0]))

        return mappings

    # handle cases that require matching
    else:
        # init data struct to track row similarities
        store = ScoredIndexStore()

        for i, mag_row in mag_slice.iterrows():
            for j, add_row in add_slice.iterrows():
                # init candidate object and get match score
                candidate = MatchCandidate(mag_row, add_row)
                match_score = candidate.evaluate()

                # register the evaluation in the store
                store.add(mag_row.numfind, j, match_score)  # type: ignore

        # get the top match score from the store
        while True:
            # if the mag numfind list is empty
            # append all remaining addendums
            if len(mag_numfind_idxs) == 0:
                for add_idx in add_indexes:
                    mappings.append(
                        MappingPair(source=None, addendum=add_idx, append=True)
                    )
                break

            # else if the add index list is empty
            # append all remaining magview rows
            elif len(add_indexes) == 0:
                for numfind, idxs in mag_numfind_idxs.items():
                    for source_idx in idxs:
                        mappings.append(
                            MappingPair(
                                source=source_idx,
                                addendum=None,
                                append=True,
                            )
                        )
                break

            # handle 1:1 matches so we can skip unneeded computation
            elif (len(mag_numfind_idxs) == 1) & (len(add_indexes) == 1):
                # get the single numfind and retrieve its associated idxs
                numfind: int = list(mag_numfind_idxs.keys())[0]
                idxs: list[int] = mag_numfind_idxs[numfind]

                # add them to the mappings object
                for source_idx in idxs:
                    mappings.append(
                        MappingPair(source=source_idx, addendum=add_indexes[0])
                    )

                break

            # get the highest match score
            top_match = store.get_sorted_items()[0]

            # extract match indices
            match_mag_num = top_match.source_num
            match_add_idx = top_match.add_idx

            # get the indices associated with the numfind
            idxs: list[int] = mag_numfind_idxs.pop(match_mag_num)

            for source_idx in idxs:
                # append match to mapping obj
                mappings.append(
                    MappingPair(
                        source=source_idx,
                        addendum=match_add_idx,
                    )
                )

            # remove all entries matching the numfind from the store
            store.remove_by_source_num(match_mag_num)

            # remove matched addendum rows by index
            add_indexes.remove(match_add_idx)
            store.remove_by_add_idx(match_add_idx)

        return mappings


# ------------------------------------------------------------------------------


# exam-level addendum matching -------------------------------------------------
def resolve_addendums(
    mag_exam_slice: pd.DataFrame,
    add_exam_slice: pd.DataFrame,
    exam_cols: list[str],
    add_cols: list[str],
) -> pd.DataFrame:
    out_data: list[pd.Series] = []

    mag_exam_data = mag_exam_slice[exam_cols].iloc[0]  # .set_index(pd.Index([0]))

    mag_slice_cols: list[str] = list(
        set(mag_exam_slice.columns).difference(set(add_cols))
    )
    acc_lat_list: list[str] = list(
        set(mag_exam_slice.acc_lat).union(set(add_exam_slice.acc_lat))
    )

    for acc_lat in acc_lat_list:
        add_slice = add_exam_slice[add_exam_slice.acc_lat == acc_lat]
        mag_slice = mag_exam_slice[mag_exam_slice.acc_lat == acc_lat]

        # match mag/add slices and get the output mapping
        mappings: AddendumMapping = match_slices(acc_lat, mag_slice, add_slice)

        # iterate over mappings
        for map_obj in mappings.mappings:
            mag_idx: Optional[int] = map_obj.source
            add_idx: Optional[int] = map_obj.addendum

            if (map_obj.append == True) & (mag_idx is None):
                # append addendum only
                row_data: pd.Series = pd.concat(
                    [mag_exam_data, add_slice.loc[add_idx, add_cols]]  # type: ignore
                )
                out_data.append(row_data)

            elif (map_obj.append == True) & (add_idx is None):
                # append source only
                out_data.append(mag_slice.loc[mag_idx])  # type: ignore

            else:
                # merge matched indexes
                row_data: pd.Series = pd.concat(
                    [
                        mag_slice.loc[mag_idx, mag_slice_cols],  # type: ignore
                        add_slice.loc[add_idx, add_cols],  # type: ignore
                    ]
                )
                row_data["numfind"] = mag_slice.loc[mag_idx, "numfind"]  # type: ignore
                out_data.append(row_data)

    return pd.DataFrame(out_data, columns=mag_exam_slice.columns)


# ------------------------------------------------------------------------------


# utility funcs ----------------------------------------------------------------
def is_str_number(v: str) -> Union[float, False]:
    try:
        float_v = float(v)
        return float_v

    except ValueError:
        return False


# ------------------------------------------------------------------------------

In [None]:
add_df = pd.read_csv(ADDENDUMS_PATH)

# correct variable dtypes and rename columns
add_rename_dict = {
    "accession_anon": "acc_anon",
    "EMPI_anon": "empi_anon",
    "studydate_ANON": "study_date_anon",
    "dt_final_ANON": "dt_final_anon",
    "dt_rel_ANON": "dt_rel_anon",
    "linkedaccession_anon": "linked_acc_anon",
    "procdate_ANON": "procdate_anon",
    "save_date_ANON": "save_date_anon",
    "pdate_ANON": "pdate_anon",
    "patientage_anon": "age_at_study",
    "depth.1": "bdepth",
    "distance.1": "bdistance",
    "side.1": "bside",
}
add_df.rename(columns=add_rename_dict, inplace=True)
add_df["acc_anon"] = pd.to_numeric(add_df["acc_anon"])

# drop unnecessary columns
add_drop_cols = [
    "comment",
    "path_dr",
    "init.1",
    "mdelayed.1",
    "save_date_anon",
    "secondaryADDENDUM",
    "surgeon",
    "numfind",
]
add_df = add_df.drop(columns=add_drop_cols)

# summarize dataframe contents
add_df.embed.summarize("Addended Exams")
print(len(add_df))

In [None]:
# in cases where an exam had multiple addendums, filter them to only consider the final addendum results
max_anum_dict = (
    add_df.groupby("acc_anon").apply(lambda group: group.anum.max()).to_dict()
)

add_df["max_anum"] = add_df["acc_anon"].map(max_anum_dict)
add_df["max_anum"].value_counts()

add_df = add_df[add_df["anum"] == add_df["max_anum"]]
add_df.embed.summarize("Filtered Addended Exams")
print(len(add_df))

In [None]:
# convert any 'B' findings to separate 'L'/'R' findings
new_rows = []

for _, row in add_df.iterrows():
    if (row["side"] == "B") or (row["side"].strip() == ""):
        row_L = row.copy()
        row_L["side"] = "L"
        row_R = row.copy()
        row_R["side"] = "R"
        new_rows.extend([row_L, row_R])
    else:
        new_rows.append(row)

# create a new DataFrame from the expanded rows
add_df = pd.DataFrame(new_rows).reset_index(drop=True)

# create a new acc_lat identifier to track labels per exam side
add_df["acc_lat"] = add_df["acc_anon"].astype(str) + "_" + add_df["side"].astype(str)

add_df.side.value_counts(dropna=False)

In [None]:
# get the addendum subsets for scr/diag exams
scr_add_df = add_df[add_df.desc.str.contains("scr", case=False)]
diag_add_df = add_df[add_df.desc.str.contains("diag", case=False)]

# summarize dataframe contents
scr_add_df.embed.summarize("Addended Screen Exams")
diag_add_df.embed.summarize("Addended Diagnostic Exams")

### 0.3.1 Handle Screen Addendums

In [None]:
scr_accs: list[int] = scr_add_df.acc_anon.unique().tolist()  # type: ignore
scr_acc_lats: list[int] = scr_add_df.acc_lat.unique().tolist()  # type: ignore

print(f"{len(scr_accs):,} addended exams")
print(f"{len(scr_acc_lats):,} addended exams-sides")

In [None]:
# get exam level source columns to extract when merging
exam_cols = (
    embed_params.level_columns["patient"]
    + [
        "height",
        "weight",
        "livebirths",
        "menopauseage",
        "pregnancyage",
        "menarcheage",
        "ASHKENAZI",
    ]
    + embed_params.level_columns["exam"]
    + embed_params.level_columns["procedure"]
)
exam_cols = [c for c in exam_cols if c in list(mag_scr_contra.columns)]

# get addendum columns to extract when merging
add_cols = [
    c for c in add_df.columns if (c in mag_scr_contra.columns) and (c not in exam_cols)
]

resolved_data: list[pd.DataFrame] = []
scr_acc_list = scr_add_df.acc_anon.unique().tolist()

for acc in tqdm(scr_acc_list):
    # get exam subsets
    mag_exam_slice = mag_scr_contra[mag_scr_contra.acc_anon == acc]
    add_exam_slice = scr_add_df[scr_add_df.acc_anon == acc]

    # skip resolution of addendums if the acc_anon doesn't exist in the magview sample at all
    if len(mag_exam_slice) == 0:
        continue

    # append resolved data to the output list
    resolved_data.append(
        resolve_addendums(mag_exam_slice, add_exam_slice, exam_cols, add_cols)
    )

scr_resolved_df = pd.concat(resolved_data, axis=0).reset_index(drop=True)

In [None]:
scr_resolved_df.calcnumber = scr_resolved_df.calcnumber.fillna(0)
scr_resolved_df.calcnumber.value_counts(dropna=False)

In [None]:
addended_accs = scr_resolved_df.acc_anon.unique().tolist()

mag_scr_add = pd.concat(
    [mag_scr_contra[~mag_scr_contra.acc_anon.isin(addended_accs)], scr_resolved_df],
    axis=0,
)
mag_scr_add = mag_scr_add.sort_values(["acc_anon", "side"]).reset_index(drop=True)

print(len(mag_scr_add))
mag_scr_add.embed.summarize("Magview Screen Post-Addendum")

In [None]:
display(mag_scr_add.asses.value_counts(dropna=False))

# exclude screen findings with a missing BI-RADS assessment
mag_scr_add = mag_scr_add[~pd.isna(mag_scr_add.asses)]

display(mag_scr_add.asses.value_counts(dropna=False))

### 0.3.2 Handle Diagnostic Addendums

In [None]:
dx_accs: list[int] = diag_add_df.acc_anon.unique().tolist()  # type: ignore
dx_acc_lats: list[int] = diag_add_df.acc_lat.unique().tolist()  # type: ignore

In [None]:
# get exam level source columns to extract when merging
exam_cols = (
    embed_params.level_columns["patient"]
    + [
        "height",
        "weight",
        "livebirths",
        "menopauseage",
        "pregnancyage",
        "menarcheage",
        "ASHKENAZI",
    ]
    + embed_params.level_columns["exam"]
    + embed_params.level_columns["procedure"]
)
exam_cols = [c for c in exam_cols if c in list(mag_diag_contra.columns)]

# get addendum columns to extract when merging
add_cols = [
    c for c in add_df.columns if (c in mag_diag_contra.columns) and (c not in exam_cols)
]

resolved_data: list[pd.DataFrame] = []
diag_acc_list = diag_add_df.acc_anon.unique().tolist()

for acc in tqdm(diag_acc_list):
    # get exam subsets
    mag_exam_slice = mag_diag_contra[mag_diag_contra.acc_anon == acc]
    add_exam_slice = diag_add_df[diag_add_df.acc_anon == acc]

    # skip resolution of addendums if the acc_anon doesn't exist in the magview sample at all
    if len(mag_exam_slice) == 0:
        continue

    # append resolved data to the output list
    resolved_data.append(
        resolve_addendums(mag_exam_slice, add_exam_slice, exam_cols, add_cols)
    )

diag_resolved_df = pd.concat(resolved_data, axis=0).reset_index(drop=True)

In [None]:
addended_accs = diag_resolved_df.acc_anon.unique().tolist()

mag_diag_add = pd.concat(
    [mag_diag_contra[~mag_diag_contra.acc_anon.isin(addended_accs)], diag_resolved_df],
    axis=0,
)
mag_diag_add = mag_diag_add.sort_values(["acc_anon", "side"]).reset_index(drop=True)

print(len(mag_diag_add))
mag_diag_add.embed.summarize("Magview Diagnostic Post-Addendum")

## 0.4 Derive Exam-Type Laterality

In [None]:
def get_exam_laterality(row: pd.Series) -> str | None:
    # extract description and lowercase it
    finding_desc = row.desc.lower()

    if "bilat" in finding_desc:
        return "B"
    elif "left" in finding_desc:
        return "L"
    elif "right" in finding_desc:
        return "R"
    else:
        return None


# derive exam laterality from their descriptions
mag_scr_add["exam_laterality"] = mag_scr_add.progress_apply(get_exam_laterality, axis=1)  # type: ignore
mag_diag_add["exam_laterality"] = mag_diag_add.progress_apply(get_exam_laterality, axis=1)  # type: ignore

## 0.5 Derive Exam-Level BIRADS and Path.
Get the most severe BIRADS and path_severity associated with each exam.

In [None]:
# for each screening exam, take the most severe birads as the representative
def get_worst_ps(group):
    return group.path_severity.min()


def get_worst_br(group):
    exam_desc = group.desc.tolist()[0]
    if "screen" in exam_desc.lower():
        br_to_val_dict = {
            "A": 0,  # 'A' maps to birads 0
            "B": 1,  # 'B' maps to birads 2
            "N": 2,  # 'N' maps to birads 1
        }
    else:
        br_to_val_dict = {
            "A": 6,  # 'A' shouldn't exist here, but it might
            "N": 5,  # 'N' maps to birads 1
            "B": 4,  # 'B' maps to birads 2
            "P": 3,  # 'P' maps to birads 3
            "S": 2,  # 'S' maps to birads 4
            "M": 1,  # 'M' maps to birads 5
            "K": 0,  # 'K' maps to birads 6
        }

    val_to_br_dict = {v: k for k, v in br_to_val_dict.items()}
    worst_br_val = min(group.asses.map(br_to_val_dict).tolist())
    return val_to_br_dict.get(worst_br_val, "")

In [None]:
worst_scr_br_dict = (
    mag_scr_add.groupby("acc_anon").progress_apply(get_worst_br).to_dict() # type: ignore
)
worst_diag_br_dict = (
    mag_diag_add.groupby("acc_anon").progress_apply(get_worst_br).to_dict() # type: ignore
)

# map back to magview
mag_scr_add["exam_birads"] = ""
mag_scr_add["exam_birads"] = mag_scr_add["acc_anon"].map(worst_scr_br_dict)

mag_diag_add["exam_birads"] = ""
mag_diag_add["exam_birads"] = mag_diag_add["acc_anon"].map(worst_diag_br_dict)

In [None]:
# apply the 'get_worst_ps' function to the data (grouped by exam) and output [exam > pathology] mappings as a dict
# don't apply it to exam findings with no path severity (since they can't affect results)
worst_scr_path_dict = (
    mag_scr_add[~pd.isnull(mag_scr_add.path_severity)]
    .groupby("acc_anon")
    .progress_apply(get_worst_ps)  # type: ignore
    .to_dict()
)
worst_diag_path_dict = (
    mag_diag_add[~pd.isnull(mag_diag_add.path_severity)]
    .groupby("acc_anon")
    .progress_apply(get_worst_ps)  # type: ignore
    .to_dict()
)

# map back to magview
mag_scr_add["exam_path_severity"] = np.nan
mag_scr_add["exam_path_severity"] = mag_scr_add["acc_anon"].map(worst_scr_path_dict)

mag_diag_add["exam_path_severity"] = np.nan
mag_diag_add["exam_path_severity"] = mag_diag_add["acc_anon"].map(worst_diag_path_dict)

## 0.6 Aggregate Exam Biopsy Sides

In [None]:
def aggregate_bsides(group):
    # applied to exam groups
    bside_list = group.bside.unique().tolist()
    # return the only bside if we only have 1 (this should never be 0 since NaN is included)
    # this should return an IndexError if it ever is 0
    if len(bside_list) == 1:
        return bside_list[0]

    # otherwise aggregate bilateral bsides
    elif ("B" in bside_list) or (("L" in bside_list) & ("R" in bside_list)):
        return "B"
    # handle left bsides with no right or 'B' (other is a NaN)
    elif "L" in bside_list:
        return "L"
    # handle right bsides with no left or 'B' (other is a NaN)
    elif "R" in bside_list:
        return "R"
    else:
        return "ERROR"


def get_bside_aggregation_dict(df: pd.DataFrame) -> dict[float, str]:
    # we only need to apply this to exam findings with no exam-level pathology registered
    path_na_mask: pd.Series[bool] = pd.isna(df.exam_path_severity)

    # or exam findings where the finding-level path severity matches the exam-level path severity
    path_match_mask: pd.Series[bool] = ~pd.isna(df.exam_path_severity) & (
        df.path_severity == df.exam_path_severity
    )

    # define a list of columns to consider
    col_list: list[str] = [
        "acc_anon",
        "empi_anon",
        "study_date_anon",
        "exam_birads",
        "exam_path_severity",
        "bside",
    ]

    # get the relevant subset of the data, then
    df_subset: pd.DataFrame = df.loc[path_na_mask | path_match_mask, col_list]

    # drop any duplicate rows, group by exam, then apply the agg func and output a [exam > bside] mapping dict
    bside_agg_dict: dict[float, str] = (
        df_subset.drop_duplicates()
        .groupby("acc_anon")
        .progress_apply(aggregate_bsides)  # type: ignore
        .to_dict()
    )
    return bside_agg_dict


# apply the agg function and get a dict of exam mappings
scr_bside_agg_dict: dict[float, str] = get_bside_aggregation_dict(mag_scr_add)
diag_bside_agg_dict: dict[float, str] = get_bside_aggregation_dict(mag_diag_add)

# map the agg dict back to the dataframe
mag_scr_add["exam_bside"] = mag_scr_add["acc_anon"].map(scr_bside_agg_dict)
display(mag_scr_add["exam_bside"].value_counts(dropna=False))

mag_diag_add["exam_bside"] = mag_diag_add["acc_anon"].map(diag_bside_agg_dict)
display(mag_diag_add["exam_bside"].value_counts(dropna=False))

## 0.7 Assign Screen BIRADS Helper Variables
These are a shorthand for exams with a screening `exam_birads` of 'A' (abnormal) or 'N'/'B' (negative/benign)

In [None]:
scr_br_0_list = (
    mag_scr_add[mag_scr_add.exam_birads.isin(["A"])].acc_anon.unique().tolist()
)

scr_br_12_list = (
    mag_scr_add[mag_scr_add.exam_birads.isin(["N", "B"])].acc_anon.unique().tolist()
)

mag_scr_add["scr_br_0"] = False
mag_scr_add.loc[mag_scr_add.acc_anon.isin(scr_br_0_list), "scr_br_0"] = True

mag_scr_add["scr_br_12"] = False
mag_scr_add.loc[mag_scr_add.acc_anon.isin(scr_br_12_list), "scr_br_12"] = True

## 0.8 Prepare Target Sample Subset

In [None]:
def prepare_score_data(data: pd.DataFrame) -> pd.DataFrame:
    # correct variable dtypes and rename columns
    rename_dict: dict[str, str] = {"accession number": "acc_anon", "Cohort": "cohort"}

    data.rename(columns=rename_dict, inplace=True)
    return data[["acc_anon", "cohort", "score"]]


# load our score dataframe and prepare it
score_df: pd.DataFrame = pd.read_csv(SCORE_PATH)
score_df = prepare_score_data(score_df)

# ensure key columns have the correct data types
score_df["acc_anon"] = pd.to_numeric(score_df["acc_anon"])

print(len(score_df))

# exclude results from exams with an invalid accession (coded as 999)
score_df = score_df[score_df.acc_anon != 999]

print(len(score_df))

In [None]:
# get the list of accessions used in the study then get the subset of magview corresponding to it
lunit_acc_list = score_df.acc_anon.tolist()
mag_sample = mag_scr_add[mag_scr_add.acc_anon.isin(lunit_acc_list)]

# summarize dataframe contents
mag_sample.embed.summarize("Study Sample")

In [None]:
scr_dict = (
    mag_df.drop_duplicates("acc_anon")
    .set_index("acc_anon")
    .desc.str.contains("screen", case=False)
    .to_dict()
)

score_df.acc_anon.map(scr_dict).value_counts(dropna=False)

---

# 1. Handle Follow-Up Exam Data

## 1.1 Define Diagnostic/Ultrasound Dataframes

In [None]:
followup_cols = [
    "acc_anon",
    "empi_anon",
    "study_date_anon",
    "exam_laterality",
    "exam_birads",
    "exam_path_severity",
    "exam_bside",
]

In [None]:
# select only the relevant columns
mag_diag_merge = mag_diag_add.loc[
    mag_diag_add.desc.str.contains("diag", case=False), followup_cols
].drop_duplicates()

# ensure we have exactly 1 row for each exam
print(
    "any duplicate exam rows?", mag_diag_merge.acc_anon.nunique() != len(mag_diag_merge)
)

In [None]:
# get subset of magview corresponding to ultrasound exams
mag_us = mag_diag_add.loc[mag_diag_add.desc.str.contains("US")]
mag_us = mag_us.dropna(subset="asses")
mag_us = mag_us[followup_cols].drop_duplicates()

# ensure we have exactly 1 row for each exam
print("any duplicate exam rows?", mag_us.acc_anon.nunique() != len(mag_us))

## 1.2 Get Follow-Up Mappings

In [None]:
def get_followup_map_dict(
    df: pd.DataFrame, followup_df: pd.DataFrame, time_delta: Union[int, float] = 180
):
    # don't consider followups with an undefined exam_birads (indicates an invalid birads for that stage)
    # followup_df = followup_df[(followup_df.exam_birads != '') & ~pd.isna(followup_df.exam_birads)]

    # time delta in days
    # expects df to have been corrected for contralateral findings (and for no NA finding sides to exist)
    # previous versions assumed no 'B' findings but this does not
    merge_df = df.merge(
        followup_df, on="empi_anon", how="inner", suffixes=(None, "_fu")
    )

    # ensure exam laterality match, L==L, R==R, or either original/followup is bilateral
    merge_df = merge_df.loc[
        (merge_df.exam_laterality == merge_df.exam_laterality_fu)
        | (merge_df.exam_laterality == "B")
        | (merge_df.exam_laterality_fu == "B")
    ]

    # exclude followups with an invalid time delta
    merge_df["fu_delta"] = (
        merge_df.study_date_anon_fu - merge_df.study_date_anon
    ).dt.days
    merge_df = merge_df.loc[
        (merge_df.fu_delta >= 0) & (merge_df.fu_delta <= time_delta)
    ]

    # get the accession of the first valid followup for each exam and output a dict of mappings
    map_dict = (
        merge_df.sort_values("fu_delta")
        .drop_duplicates("acc_anon", keep="first")
        .set_index("acc_anon")["acc_anon_fu"]
        .to_dict()
    )
    return map_dict

### 1.2.1 BIRADS 0 Exams

In [None]:
# mag_br0 = mag_contra_df[mag_contra_df.scr_br_0 == True]
mag_br0 = mag_sample[mag_sample.scr_br_0 == True]

# get birads 0 diagnostic followup map dict
br0_dx_map_dict = get_followup_map_dict(mag_br0, mag_diag_merge, time_delta=180)
print(f"{len(br0_dx_map_dict)} valid DX followups found for Screen BIRADS 0s")

# get birads 0 ultrasound followup map dict
br0_us_map_dict = get_followup_map_dict(mag_br0, mag_us, time_delta=180)
print(f"{len(br0_us_map_dict)} valid US followups found for Screen BIRADS 0s")

### 1.2.2 BIRADS 1/2 Exams

In [None]:
# mag_br12 = mag_contra_df[mag_contra_df.scr_br_12 == True]
mag_br12 = mag_sample[mag_sample.scr_br_12 == True]

# get birads 0 diagnostic followup map dict
br12_dx_map_dict = get_followup_map_dict(mag_br12, mag_diag_merge, time_delta=365)
print(f"{len(br12_dx_map_dict)} valid DX followups found for Screen BIRADS 1/2s")

# get birads 0 ultrasound followup map dict
br12_us_map_dict = get_followup_map_dict(mag_br12, mag_us, time_delta=365)
print(f"{len(br12_us_map_dict)} valid US followups found for Screen BIRADS 1/2s")

## 1.3 Perform Follow-Up Mapping

### 1.3.1 Diagnostic Follow-Ups

In [None]:
# combine both dx map dicts and use it to derive a 'earliest_dx_acc' column in mag_sample
# which contains any valid followup dx acc and exam-level dx birads/path severity
mag_sample["earliest_dx_acc"] = mag_sample["acc_anon"].map(
    {**br0_dx_map_dict, **br12_dx_map_dict}
)

mag_sample = mag_sample.merge(
    mag_diag_merge[["acc_anon", "exam_birads", "exam_path_severity", "exam_bside"]],
    how="left",
    left_on=["earliest_dx_acc"],
    right_on=["acc_anon"],
    suffixes=(None, "_dx"),
)

### 1.3.2 Ultrasound Follow-Ups

In [None]:
# combine both dx map dicts and use it to derive a 'earliest_dx_acc' column in mag_sample
# which contains any valid followup dx acc and exam-level dx birads/path severity
mag_sample["earliest_us_acc"] = mag_sample["acc_anon"].map(
    {**br0_us_map_dict, **br12_us_map_dict}
)

mag_sample = mag_sample.merge(
    mag_us[
        ["acc_anon", "exam_birads", "exam_path_severity", "exam_bside"]
    ].drop_duplicates(),
    how="left",
    left_on=["earliest_us_acc"],
    right_on=["acc_anon"],
    suffixes=(None, "_us"),
)

## 1.4 Summarize Follow-Ups

Prioritize results on diagnostic follow-ups if they exist (and are more severe than existing valid ultrasound follow-ups). Otherwise, use any valid ultrasound follow-up results.

In [None]:
mag_sample["followup_type"] = ""

# present dx (prioritized if both dx and us present)
mag_sample.loc[~pd.isna(mag_sample.earliest_dx_acc), "followup_type"] = "DX"

# missing dx, present us
mag_sample.loc[
    pd.isna(mag_sample.earliest_dx_acc) & ~pd.isna(mag_sample.earliest_us_acc),
    "followup_type",
] = "US"

# present dx + us, us path_severity more severe so it overrides dx
mag_sample.loc[
    ~pd.isna(mag_sample.earliest_dx_acc)
    & ~pd.isna(mag_sample.earliest_us_acc)
    & (mag_sample.exam_path_severity_us < mag_sample.exam_path_severity_dx),
    "followup_type",
] = "US"

In [None]:
mag_sample["followup_path_severity"] = np.nan
mag_sample.loc[mag_sample.followup_type == "DX", "followup_path_severity"] = (
    mag_sample.loc[mag_sample.followup_type == "DX", "exam_path_severity_dx"]
)
mag_sample.loc[mag_sample.followup_type == "US", "followup_path_severity"] = (
    mag_sample.loc[mag_sample.followup_type == "US", "exam_path_severity_us"]
)

mag_sample["followup_bside"] = ""
mag_sample.loc[mag_sample.followup_type == "DX", "followup_bside"] = mag_sample.loc[
    mag_sample.followup_type == "DX", "exam_bside_dx"
]
mag_sample.loc[mag_sample.followup_type == "US", "followup_bside"] = mag_sample.loc[
    mag_sample.followup_type == "US", "exam_bside_us"
]

mag_sample["followup_birads"] = ""
mag_sample.loc[mag_sample.followup_type == "DX", "followup_birads"] = mag_sample.loc[
    mag_sample.followup_type == "DX", "exam_birads_dx"
]
mag_sample.loc[mag_sample.followup_type == "US", "followup_birads"] = mag_sample.loc[
    mag_sample.followup_type == "US", "exam_birads_us"
]

## 1.5 Assign Follow-Up Helper Variables

### 1.5.1 BIRADS 1/2/3

In [None]:
mag_sample["dx_br_123"] = ""
mag_sample.loc[(mag_sample.scr_br_0 == True), "dx_br_123"] = False
mag_sample.loc[
    (mag_sample.scr_br_0 == True) & (mag_sample.followup_birads.isin(["N", "B", "P"])),
    "dx_br_123",
] = True

### 1.5.2 Pathology

In [None]:
for col_name, path_levels in zip(
    ["ps_01", "ps_234", "ps_5"], [[0.0, 1.0], [2.0, 3.0, 4.0], [5.0]]
):
    mag_sample[col_name] = ""
    mag_sample.loc[(mag_sample.scr_br_0 == True), col_name] = False
    mag_sample.loc[
        (mag_sample.scr_br_0 == True)
        & (
            mag_sample.exam_path_severity.isin(path_levels)
            | mag_sample.followup_path_severity.isin(path_levels)
        ),
        col_name,
    ] = True

mag_sample.loc[mag_sample.ps_01 == True, ["ps_234", "ps_5"]] = False
mag_sample.loc[mag_sample.ps_234 == True, ["ps_5"]] = False

## 1.6 Identify Interval Cancers

In [None]:
# only consider scr br1/2 exams.
# allow bilateral exams with dx path severity in [0.0, 1.0]
# allow unilateral exams with dx path severity in [0.0, 1.0] IF the exam laterality and biopsy side match
bilat_interval_mask = (
    (mag_sample.scr_br_12 == True)
    & (mag_sample.exam_laterality == "B")
    & mag_sample.followup_path_severity.isin([0.0, 1.0])
)
unilat_interval_mask = (
    (mag_sample.scr_br_12 == True)
    & (mag_sample.exam_laterality != "B")
    & mag_sample.followup_path_severity.isin([0.0, 1.0])
    & (mag_sample.exam_laterality == mag_sample.followup_bside)
)

mag_sample["interval_cancer"] = ""
mag_sample.loc[(mag_sample.scr_br_12 == True), "interval_cancer"] = False
mag_sample.loc[bilat_interval_mask | unilat_interval_mask, "interval_cancer"] = True

## 1.7 Evaluate Long-Term Follow-Up Status

In [None]:
def verify_condition(df, acc_list, span_list, date_col: str = "study_date_anon"):
    df[date_col] = pd.to_datetime(df[date_col])
    df["empi_anon"] = pd.to_numeric(df["empi_anon"])
    df["acc_anon"] = pd.to_numeric(df["acc_anon"])
    condition_dict = dict()

    # # get a list of all qualifying conditions to do the first pass elim
    # qual_list = [span['qualifying'] for span in span_list if span['qualifying'] != ""]

    # # concatenate all qual strings into an or condition for an initial pass
    # init_qual_str = " | ".join([f"({qual_str})" for qual_str in qual_list])
    df_acc_list = df.acc_anon.unique().tolist()
    acc_list = list(set(acc_list).intersection(set(df_acc_list)))

    # iterate over exams
    for target_acc in tqdm(acc_list):
        # init bool to track whether the acc has been accepted/rejected
        acc_is_valid = True

        # is there a faster way to do this?
        acc_mask = df.acc_anon == target_acc
        date_i = df.loc[acc_mask, date_col].mode()[0]
        empi = df.loc[acc_mask, "empi_anon"].mode()[0]

        patient_df = df[df.empi_anon == empi]

        # iterate over spans
        for span in span_list:
            # determine span parameters
            span_length = span["length"]  # throw an error if no span length was given
            span_qual = span.get("qualifying", "")
            span_disqual = span.get("disqualifying", "")
            span_n = span.get(
                "required_n", 0 if not span_qual else 1
            )  # default to 1 if there's a qualifying condition

            # get span end date
            date_f = date_i + pd.Timedelta(days=span_length)

            # find subset of patient df between the span start/end dates
            span_df = patient_df[
                (patient_df[date_col] >= date_i) & (patient_df[date_col] <= date_f)
            ]

            # evaluate qual condition if it exists, else keep span_df
            span_df = span_df.query(span_qual) if span_qual else span_df

            # if span_df < n cases, reject the acc
            if span_df.acc_anon.nunique() < span_n:
                acc_is_valid = False
                break

            # otherwise if we have a qual condition w/ sufficient exams, + a disqual condition to eval
            elif span_qual and span_disqual:
                # get the study date of the nearest valid exam, and filter span_df to exclude cases later than it
                # so we only check the disqual condition up to this point
                new_date_f = span_df[date_col].sort_values().unique().tolist()[0]

                # reformat the span_disqual string to include the new cutoff date condition
                span_disqual += f" & ({date_col} <= '{new_date_f}')"

            if span_disqual:
                # if there are any disqualifying cases, reject the acc
                if span_df.query(span_disqual).acc_anon.nunique():
                    acc_is_valid = False
                    break

            # increment date_0 before evaluating the next span
            # we only need to do this if the last span passes
            date_i = date_f + pd.Timedelta(days=1)

        # if all spans pass, accept the acc
        if acc_is_valid:
            condition_dict[target_acc] = True
        else:
            condition_dict[target_acc] = False

    # return the condition dict after evaluating all exams
    return condition_dict

In [None]:
# only evaluate followup status for negative exams
scr_neg_acc_list = (
    mag_sample.loc[
        (mag_sample.scr_br_12 == True) & (mag_sample.interval_cancer == False),
        "acc_anon",
    ]
    .unique()
    .tolist()
)
print("screen negatives accs:", len(scr_neg_acc_list))

neg_acc_list = (
    mag_sample.loc[
        ~((mag_sample.scr_br_0 == True) & (mag_sample.ps_01 == True)), "acc_anon"
    ]
    .unique()
    .tolist()
)
print("all negatives accs:", len(neg_acc_list))

In [None]:
mag_contra_merge = pd.concat([mag_scr_add, mag_diag_add])

In [None]:
span_list = [
    {  # the first span has no conditions, so any exams here are ignored
        "length": 365,
        "qualifying": "",
        "disqualifying": "",
        "required_n": 0,
    },
    {  # the second span accepts accs with >=1 of *any* exam type
        "length": 365 * 3,
        "qualifying": "",
        "disqualifying": "",
        "required_n": 1,  # required_n applies to the qualifying condition, must be >1 exams of any kind during this period
    },
]

any_followup_dict = verify_condition(mag_contra_merge, scr_neg_acc_list, span_list)

mag_sample["any_followup"] = mag_sample.acc_anon.map(any_followup_dict)

---

# 2. Data Finalization

## 2.1 Data Enrichment


### 2.1.1 Major Exam Class Labels

In [None]:
mag_sample["label"] = "INVALID"
mag_sample["label"] = mag_sample["label"].case_when([
    (mag_sample.eval("(scr_br_12 == True) & (interval_cancer == False)"), "Screen Negative"), 
    (mag_sample.eval("(scr_br_12 == True) & (interval_cancer == True)"), "Interval Cancer"),
    (mag_sample.eval("(scr_br_0 == True) & (ps_01 == True)"), "Screen Detected Cancer"),
    (mag_sample.eval("(scr_br_0 == True) & (ps_234 == True)"), "Biopsy Proven Benign"),
    (mag_sample.eval("(scr_br_0 == True) & (dx_br_123 == True)"), "Diagnostic Negative"),
    (mag_sample.eval("(scr_br_0 == True) & (ps_5 == True)"), "Other Cancer"),
])  # type: ignore

### 2.1.2 Finding Characteristics
First, we'll derive characteristics for all findings in the sample. Then we'll aggregate these upwards so exam-level characteristics only consider abnormal findings in abnormal exams, and negative/benign findings in negative/benign exams.

In [None]:
mag_sample.calcdistri = mag_sample.calcdistri.fillna("")
mag_sample.calcdistri = mag_sample.calcdistri.str.strip().astype(str)
mag_sample.calcdistri.value_counts(dropna=False)

In [None]:
mag_sample.calcfind = mag_sample.calcfind.fillna("")
mag_sample.calcfind = mag_sample.calcfind.str.strip().astype(str)
mag_sample.calcfind.value_counts(dropna=False)

In [None]:
mag_sample.calcnumber = mag_sample.calcnumber.fillna(0.0)
mag_sample.calcnumber = mag_sample.calcnumber.astype(float)
mag_sample.calcnumber.value_counts(dropna=False)

In [None]:
# derive findings-level characteristics
mag_sample[["mass", "asymmetry", "arch_distortion", "calcification"]] = mag_sample.progress_apply(
    embed_params.extract_characteristics, axis="columns", result_type="expand"
)  # type: ignore

# finding characteristics should only be present on benign/abnormal findings
mag_sample.loc[
    mag_sample.asses == "N", ["mass", "asymmetry", "arch_distortion", "calcification"]
] = 0

In [None]:
# get all exams with a birads A finding and filter them so they only have birads A findings
br0_exams = mag_sample[
    mag_sample.exam_birads.isin(["A"]) & mag_sample.asses.isin(["A"])
]
br12_exams = mag_sample[mag_sample.exam_birads.isin(["N", "B"])]

# generalize to exam-level
br0_exam_chars_dict = br0_exams.groupby("acc_anon").progress_apply(embed_params.aggregate_characteristics).to_dict()  # type: ignore
br12_exam_chars_dict = br12_exams.groupby("acc_anon").progress_apply(embed_params.aggregate_characteristics).to_dict()  # type: ignore

In [None]:
for char_var in ["exam_mass", "exam_asymmetry", "exam_arch_distortion", "exam_calcification"]:
    br12_char_dict = {k: v[char_var] for k, v in br12_exam_chars_dict.items()}
    br0_char_dict = {k: v[char_var] for k, v in br0_exam_chars_dict.items()}

    mag_sample[char_var] = mag_sample.acc_anon.map({**br12_char_dict, **br0_char_dict})

In [None]:
# now the only exams with any missing values have invalid BIRADS and will
# be dropped, so they can be considered false
mag_sample = mag_sample.fillna(
    {
        "exam_mass": False,
        "exam_asymmetry": False,
        "exam_arch_distortion": False,
        "exam_calcification": False,
    }
)

### 2.1.3 Screen Detected Pathology

In [None]:
mag_sample["scr_detected_path"] = "No Pathology"
mag_sample.loc[mag_sample.scr_br_0 == True, "scr_detected_path"] = mag_sample.loc[mag_sample.scr_br_0 == True, "scr_detected_path"].case_when([
    (mag_sample.eval("(exam_path_severity == 0.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 0.0))"), "Invasive Cancer"), 
    (mag_sample.eval("(exam_path_severity == 1.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 1.0))"), "Noninvasive Cancer"),
    (mag_sample.eval("(exam_path_severity == 2.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 2.0))"), "High Risk Lesion"),
    (mag_sample.eval("(exam_path_severity == 3.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 3.0))"), "Borderline Lesion"),
    (mag_sample.eval("(exam_path_severity == 4.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 4.0))"), "Benign Lesion"),
    (mag_sample.eval("(exam_path_severity == 5.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 5.0))"), "Other Cancer"),
]) # type: ignore

## 2.2 Data Cleaning

### 2.2.1 Patient Race

In [None]:
# standardize whitespace
mag_sample['ETHNICITY_DESC'] = mag_sample['ETHNICITY_DESC'].str.strip()
mag_sample['ETHNICITY_DESC'] = mag_sample['ETHNICITY_DESC'].str.replace(r'\s+', ' ', regex=True)

mag_sample["race"] = "Other"
mag_sample["race"] = mag_sample["race"].case_when([
    (mag_sample.eval("ETHNICITY_DESC == 'African American or Black'"), "Black"),
    (mag_sample.eval("ETHNICITY_DESC == 'Caucasian or White'"), "White"),
    (mag_sample.eval("ETHNICITY_DESC == 'Asian'"), "Asian"),
    (mag_sample.eval("ETHNICITY_DESC == 'Unknown, Unavailable or Unreported'"), "Unknown"),
    (mag_sample.eval("ETHNICITY_DESC.isna()"), "Unknown")
]) # type: ignore


### 2.2.2 Patient Ethnicity

In [None]:
# standardize whitespace
mag_sample["ETHNIC_GROUP_DESC"] = mag_sample["ETHNIC_GROUP_DESC"].str.strip()
mag_sample["ETHNIC_GROUP_DESC"] = mag_sample["ETHNIC_GROUP_DESC"].str.replace(r"\s+", " ", regex=True)

hispanic_list = ["Hispanic or Latino", "Unknown~Hispanic"]
not_hispanic_list = ["Non-Hispanic or Latino", "Non-Hispanic~Unknown", "Unknown~Non-Hispanic"]

mag_sample["ethnicity"] = "Unknown"
mag_sample["ethnicity"] = mag_sample["ethnicity"].case_when([
    (mag_sample.eval("ETHNIC_GROUP_DESC.isin(@hispanic_list)"), "Hispanic or Latino"),
    (mag_sample.eval("ETHNIC_GROUP_DESC.isin(@not_hispanic_list)"), "Not Hispanic or Latino"),
]) # type: ignore

### 2.2.3 Patient Age

In [None]:
mag_sample["age_binned"] = pd.cut(
    mag_sample["age_at_study"], bins=[0, 50, 75, 120], labels=["<50", "50-75", ">=75"]
)
mag_sample.drop_duplicates("acc_anon")["age_binned"].value_counts(dropna=False)

## 2.3 Finalize and Output Data

In [None]:
# ensure tissueden is numeric
mag_sample["tissueden"] = pd.to_numeric(mag_sample["tissueden"], errors="coerce")

# get the accessions of any exams with invalid screening-stage BIRADS
dx_only_asses = ["P", "S", "M", "K"]
invalid_exam_list = (
    mag_sample.loc[
        mag_sample.desc.str.contains("screen", case=False)
        & mag_sample.asses.isin(dx_only_asses),
        "acc_anon",
    ]
    .unique()
    .tolist()
)

In [None]:
# drop any cases where tissueden is 5.0, with no valid major label, or with other cancers present
final_df = mag_sample[
    ~(
        (mag_sample.tissueden == 5.0)
        | (mag_sample.acc_anon.isin(invalid_exam_list))
        | (mag_sample.label == "INVALID")
        | (mag_sample.label == "Other Cancer")
        | (mag_sample.scr_detected_path == "Other Cancer")
        | (mag_sample.any_followup == False)
    )
]

In [None]:
OUTPUT_PATH: str = "../../data/6.2_labeled_mag_sample.csv"

final_df.to_csv(OUTPUT_PATH, index=False)
print(f"data saved to: '{OUTPUT_PATH}'")

In [None]:
final_df = final_df.reset_index(drop=True)

---

# 3.0 Output Exam/Finding Level Data

In [None]:
SAVE_FILES = True

In [None]:
# format dtypes
final_df["empi_anon"] = pd.to_numeric(final_df["empi_anon"])
final_df["acc_anon"] = pd.to_numeric(final_df["acc_anon"])
final_df["exam_birads"] = final_df["exam_birads"].astype(str)
final_df["asses"] = final_df["asses"].astype(str)
score_df["acc_anon"] = pd.to_numeric(score_df["acc_anon"])

# merge exam scores onto the sample data
output_df = final_df.merge(score_df, how="inner", on="acc_anon")
output_df.embed.summarize()

## 3.1 Prepare Finding-Level Data

In [None]:
FILE_LEVEL = "finding"
level_unique_func = embed_params.count_findings

In [None]:
df = output_df

# validate whether dataframe has duplicates
len_df = len(df)
level_unique = level_unique_func(df)
rows_duplicated = level_unique != len_df

print(f"unique {FILE_LEVEL}s:  {level_unique}")
print(f"dataframe rows:  {len_df}")
print(f"duplicate {FILE_LEVEL} rows?:  {rows_duplicated}")

In [None]:
# we have some findings that were added by our addendums (but have no registered finding num), since these are so few we know each of these
# is not bilateral, so we'll just manually increment their finding number by the exam max + 1
for acc, group in output_df[pd.isna(output_df.numfind)].groupby("acc_anon"):
    max_numfind = output_df.loc[output_df.acc_anon == acc, "numfind"].max()
    output_df.loc[group.index[0], "numfind"] = max_numfind + 1

In [None]:
finding_df = output_df.copy()

drop_cols = ["acc_lat"]
finding_df = finding_df.drop(columns=drop_cols)

finding_df["acc_find"] = (
    finding_df["acc_anon"].astype(str) + "_" + finding_df["numfind"].astype(str)
)

patient_feature_list = embed_params.list_columns(["patient"])
patient_feature_list = [
    s
    for s in patient_feature_list
    if s not in ["GENDER_DESC", "ETHNICITY_DESC", "ETHNIC_GROUP_DESC", "cohort_num"]
]

exam_feature_list = embed_params.list_columns(["exam"])
exam_feature_list = [
    s
    for s in exam_feature_list
    if s not in ["tech_init", "init", "proccode", "case", "sdate_anon"]
]

findings_feature_list = embed_params.list_columns(["finding"])

additional_feature_list = [
    "mass",
    "asymmetry",
    "arch_distortion",
    "calcification",
    "age_binned",
    "race",
    "ethnicity",
    "score",
    "label",
    "exam_laterality",
    "exam_birads",
    "scr_detected_path",
    "followup_type",
    "followup_birads",
    "followup_path_severity",
    "exam_mass",
    "exam_asymmetry",
    "exam_arch_distortion",
    "exam_calcification",
]
total_feature_list = [
    *patient_feature_list,
    *exam_feature_list,
    *findings_feature_list,
    *additional_feature_list,
]

path_cols = ["path1", "path2", "path3", "path4", "path5"]
find_path_cols = [f"find_{s}" for s in path_cols]

finding_df = finding_df.drop_duplicates(subset=total_feature_list + path_cols)

In [None]:
df = finding_df

# validate whether dataframe has duplicates
len_df = len(df)
level_unique = level_unique_func(df)
rows_duplicated = level_unique != len_df

print(f"unique {FILE_LEVEL}s:  {level_unique}")
print(f"dataframe rows:  {len_df}")
print(f"duplicate {FILE_LEVEL} rows?:  {rows_duplicated}")

In [None]:
finding_df["acc_find"] = (
    finding_df["acc_anon"].astype(str) + "_" + finding_df["numfind"].astype(str)
)

acc_find_path_dict = dict()

for group_acc_find, group_df in tqdm(
    finding_df.groupby("acc_find"), total=level_unique
):
    if all(pd.isna(group_df.path_severity)):
        continue

    acc_find_path_list = []
    for col in path_cols:
        acc_find_path_list.extend(group_df[col].dropna().unique().tolist())

    acc_find_path_dict[group_acc_find] = list(set(acc_find_path_list))[:5]

finding_df[find_path_cols] = ""

for acc_find, path_list in tqdm(acc_find_path_dict.items()):
    acc_find_mask = finding_df.acc_find == acc_find

    for i, path_type in enumerate(path_list):
        col = find_path_cols[i]
        finding_df.loc[acc_find_mask, col] = path_type

finding_df = finding_df.drop_duplicates(subset=total_feature_list + find_path_cols)

In [None]:
br0_acc_set = set(finding_df[finding_df.exam_birads == "A"].acc_anon.unique())
br0_find_acc_set = set(
    finding_df[
        (finding_df.exam_birads == "A") & (finding_df.asses == "A")
    ].acc_anon.unique()
)

nofind_acc_list = list(br0_acc_set.difference(br0_find_acc_set))

In [None]:
# drop non-BIRADS A findings in exams with any BIRADS A findings
finding_df = finding_df[
    finding_df.acc_anon.isin(nofind_acc_list)
    | ~((finding_df.exam_birads == "A") & (finding_df.asses != "A"))
]

In [None]:
def get_exam_finding_sides(group: pd.DataFrame) -> str:
    sides = "".join(group.side.tolist())
    return sides


acc_find_sides_dict = (
    finding_df.groupby("acc_find").progress_apply(get_exam_finding_sides).to_dict()  # type: ignore
)
finding_df["acc_find_sides"] = finding_df.acc_find.map(acc_find_sides_dict)
finding_df["acc_find_sides"].value_counts(dropna=False)

In [None]:
for acc_find, group in finding_df[finding_df["acc_find_sides"] == "RR"].groupby("acc_find"):
    display(acc_find)
    display(group)

In [None]:
finding_df.loc[finding_df.acc_find_sides == "LR", "side"] = "B"
finding_df = finding_df.drop_duplicates(subset=["acc_find", "side", "path_severity"])

finding_df["acc_find_sides"].value_counts(dropna=False)

In [None]:
df = finding_df

# validate whether dataframe has duplicates
len_df = len(df)
level_unique = level_unique_func(df)
rows_duplicated = level_unique != len_df

print(f"unique {FILE_LEVEL}s:  {level_unique}")
print(f"dataframe rows:  {len_df}")
print(f"duplicate {FILE_LEVEL} rows?:  {rows_duplicated}")

In [None]:
# save the file if valid+enabled
if SAVE_FILES & rows_duplicated:
    print("File saving enabled, but duplicates detected -- skipping")
elif SAVE_FILES:
    save_path = f"../../data/6.2_{FILE_LEVEL}_level.csv"
    df.to_csv(save_path, index=False)
    print(f"File saving enabled, {FILE_LEVEL.title()}-level data saved to: {save_path}")
else:
    print("File saving disabled -- skipping")

## 3.2 Prepare Exam-Level Data

In [None]:
FILE_LEVEL = "exam"
level_unique_func = embed_params.count_exams

In [None]:
df = output_df

# validate whether dataframe has duplicates
len_df = len(df)
level_unique = level_unique_func(df)
rows_duplicated = level_unique != len_df

print(f"unique {FILE_LEVEL}s:  {level_unique}")
print(f"dataframe rows:  {len_df}")
print(f"duplicate {FILE_LEVEL} rows?:  {rows_duplicated}")

In [None]:
# define a list of patient/exam-level features
patient_feature_list = embed_params.list_columns(["patient"])
patient_feature_list = [
    s
    for s in patient_feature_list
    if s not in ["GENDER_DESC", "ETHNICITY_DESC", "ETHNIC_GROUP_DESC", "cohort_num"]
]

exam_feature_list = embed_params.list_columns(["exam"])
exam_feature_list = [
    s
    for s in exam_feature_list
    if s not in ["tech_init", "init", "proccode", "case", "sdate_anon"]
]

additional_feature_list = [
    "age_binned",
    "race",
    "ethnicity",
    "score",
    "label",
    "exam_laterality",
    "exam_birads",
    "scr_detected_path",
    "followup_type",
    "followup_birads",
    "followup_path_severity",
    "exam_mass",
    "exam_asymmetry",
    "exam_arch_distortion",
    "exam_calcification",
]
total_feature_list = [
    *patient_feature_list,
    *exam_feature_list,
    *additional_feature_list,
]

output_df.first_3_zip = pd.to_numeric(output_df.first_3_zip, errors="coerce").astype(
    float
)
exam_df = output_df[total_feature_list].drop_duplicates()

# drop any remaining duplicates
exam_df = exam_df.sort_values("exam_birads").drop_duplicates("acc_anon", keep="first")

In [None]:
df = exam_df

# validate whether dataframe has duplicates
len_df = len(df)
level_unique = level_unique_func(df)
rows_duplicated = level_unique != len_df

print(f"unique {FILE_LEVEL}s:  {level_unique}")
print(f"dataframe rows:  {len_df}")
print(f"duplicate {FILE_LEVEL} rows?:  {rows_duplicated}")

In [None]:
# save the file if valid+enabled
if SAVE_FILES & rows_duplicated:
    print("File saving enabled, but duplicates detected -- skipping")
elif SAVE_FILES:
    save_path = f"../../data/6.2_{FILE_LEVEL}_level.csv"
    df.to_csv(save_path, index=False)
    print(f"File saving enabled, {FILE_LEVEL.title()}-level data saved to: {save_path}")
else:
    print("File saving disabled -- skipping")