# Magview Screening Label Assignment Workflow 
## *[Public Release]*
### Beatrice Brown-Mulry
### 02/27/2025

<style>
    h4, h5 {
        margin: 0;
        padding: 0.5rem;
        font-weight: normal;
    }
</style>

<a id='contents'></a>

## Contents
- #### [0. Data Preparation](#section-0)
    - ##### [0.1 Prepare Magview](#section-0-1)
    - ##### [0.2 Correct Contralaterals](#section-0-2)
    - ##### [0.3 Derive Exam Laterality](#section-0-3)
    - ##### [0.4 Derive Exam-Level BIRADS and Path.](#section-0-4)
    - ##### [0.5 Aggregate Exam Biopsy Sides](#section-0-5)
    - ##### [0.6 Handle Addended v1 Exam Data](#section-0-6)
    - ##### [0.7 Assign Screen BIRADS Helper Variables](#section-0-7)
    - ##### [0.8 Prepare Target Sample Subset](#section-0-8)
- #### [1. Handle Follow-Up Exam Data](#section-1)
    - ##### [1.1 Define Diagnostic/Ultrasound Dataframes](#section-1-1)
    - ##### [1.2 Get Follow-Up Mappings](#section-1-2)
    - ##### [1.3 Perform Follow-Up Mapping](#section-1-3)
    - ##### [1.4 Summarize Follow-Ups](#section-1-4)
    - ##### [1.5 Assign Follow-Up Helper Variables](#section-1-5)
    - ##### [1.6 Identify Interval Cancers](#section-1-6)
    - ##### [1.7 Evaluate Long-Term Follow-Up Status](#section-1-7)
- #### [2. Data Finalization](#section-2)
    - ##### [2.1 Data Enrichment](#section-2-1)
    - ##### [2.2 Data Cleaning](#section-2-2)
    - ##### [2.3 Finalize and Output Data](#section-2-3)


In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import matplotlib.pyplot as plt
import embed_toolkit # version 0.2.*
from dotenv import load_dotenv # this is only used in this example to load filepaths from a .env file!
from typing import Optional, Union

pd.set_option('display.max_rows', 50)
pd.set_option('display.max_columns', 500)
pd.options.mode.chained_assignment = None

tqdm.pandas() # initialize tqdm wrapper for pandas.apply

In [2]:
# in this example I'll be loading my filepaths from a .env file with the dotenv package
# but these should be changed as needed
load_dotenv()
MAGVIEW_PATH: str = os.environ['MAGVIEW_PATH']
SCORE_PATH: str = os.environ['SCORE_PATH']
OUTPUT_PATH: Optional[str] = os.environ.get('OUTPUT_PATH', None)

---

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0'></a>
# 0. Data Preparation

<a id='section-0-1'></a>
## 0.1 Prepare Magview

In [3]:
# load dataframe
mag_df = pd.read_csv(MAGVIEW_PATH)

# ensure key columns have the correct data types
mag_df['empi_anon'] = pd.to_numeric(mag_df['empi_anon'])
mag_df['acc_anon'] = pd.to_numeric(mag_df['acc_anon'])
mag_df['study_date_anon'] = pd.to_datetime(mag_df['study_date_anon'])

# create a helper column for exam screen-status
mag_df['screen_exam'] = mag_df.desc.str.contains('screen', case=False)

# summarize dataframe contents
mag_df.embed.summarize("Magview")


  mag_df = pd.read_csv(MAGVIEW_PATH)


        Magview         
┌───────────┬──────────┐
│ Feature   │ Count    │
├───────────┼──────────┤
│ Patients  │ 116,597  │
│ Exams     │ 678,655  │
│ Findings  │ 748,803  │
└───────────┴──────────┘


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0-2'></a>
## 0.2 Correct Contralaterals
We need to run the contralateral correction function on our Magview dataframe to create entries for any negative contralateral findings that are implied on bilateral exams with single-sided unilateral findings.

In [4]:
# apply contralateral correction, drop any data for exams with no description
mag_df = mag_df.dropna(subset="desc")
mag_contra_df = embed_toolkit.correct_contralaterals(mag_df)

# correct column dtypes
mag_contra_df['study_date_anon'] = pd.to_datetime(mag_contra_df['study_date_anon'])
mag_contra_df['acc_anon'] = pd.to_numeric(mag_contra_df['acc_anon'])
mag_contra_df['empi_anon'] = pd.to_numeric(mag_contra_df['empi_anon'])

  0%|          | 0/127098 [00:00<?, ?it/s]

  return concat([out_df, correction_df]).sort_values(['empi_anon', 'acc_anon', 'numfind']).reset_index(drop=True)


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0-3'></a>
## 0.3 Derive Exam Laterality

In [5]:
def get_exam_laterality(row: pd.Series) -> str | None:
    # extract description and lowercase it
    finding_desc = row.desc.lower()
    
    if ("bilat" in finding_desc):
        return "B"
    elif ("left" in finding_desc):
        return "L"
    elif ("right" in finding_desc):
        return "R"
    else:
        return None

# derive exam laterality from their descriptions
mag_contra_df["exam_laterality"] = mag_contra_df.progress_apply(get_exam_laterality, axis=1) # type: ignore

100%|██████████| 893670/893670 [00:07<00:00, 114511.44it/s]


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0-4'></a>
## 0.4 Derive Exam-Level BIRADS and Path.
Get the most severe BIRADS and path_severity associated with each exam to use as its representative.

In [6]:
# for each screening exam, take the most severe birads as the representative
def get_worst_ps(group):
    return group.path_severity.min()

def get_worst_br(group):
    exam_desc = group.desc.tolist()[0]
    if "screen" in exam_desc.lower():
        br_to_val_dict = {
            'A': 0, # 'A' maps to birads 0
            'B': 1, # 'B' maps to birads 2
            'N': 2  # 'N' maps to birads 1
        }
    else:
        br_to_val_dict = {
            'N': 5, # 'N' maps to birads 1
            'B': 4, # 'B' maps to birads 2
            'P': 3, # 'P' maps to birads 3
            'S': 2, # 'S' maps to birads 4
            'M': 1, # 'M' maps to birads 5
            'K': 0  # 'K' maps to birads 6
        }
        
    val_to_br_dict = {v:k for k,v in br_to_val_dict.items()}
    worst_br_val = min(group.asses.map(br_to_val_dict).tolist())
    return val_to_br_dict.get(worst_br_val, '')


In [7]:
# apply the 'get_worst_br' function to the data (grouped by exam) and output [exam > birads] mappings as a dict
worst_br_dict = mag_contra_df.groupby('acc_anon').progress_apply(get_worst_br).to_dict() # type: ignore

# map back to magview
mag_contra_df['exam_birads'] = ''
mag_contra_df['exam_birads'] = mag_contra_df['acc_anon'].map(worst_br_dict)

100%|██████████| 678651/678651 [01:48<00:00, 6258.69it/s]


In [8]:
# apply the 'get_worst_ps' function to the data (grouped by exam) and output [exam > pathology] mappings as a dict
# don't apply it to exam findings with no path severity (since they can't affect results)
worst_path_dict = mag_contra_df[~pd.isnull(mag_contra_df.path_severity)].groupby('acc_anon').progress_apply(get_worst_ps).to_dict() # type: ignore

# map back to magview
mag_contra_df['exam_path_severity'] = np.nan
mag_contra_df['exam_path_severity'] = mag_contra_df['acc_anon'].map(worst_path_dict)

100%|██████████| 31758/31758 [00:01<00:00, 28708.74it/s]


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0-5'></a>
## 0.5 Aggregate Exam Biopsy Side

In [9]:
def aggregate_bsides(group):
    # applied to exam groups
    bside_list = group.bside.unique().tolist()
    # return the only bside if we only have 1 (this should never be 0 since NaN is included)
    # this should return an IndexError if it ever is 0
    if len(bside_list) == 1:
        return bside_list[0]

    # otherwise aggregate bilateral bsides
    elif ('B' in bside_list) or (('L' in bside_list) & ('R' in bside_list)):
        return 'B'
    # handle left bsides with no right or 'B' (other is a NaN)
    elif ('L' in bside_list):
        return 'L'
    # handle right bsides with no left or 'B' (other is a NaN)
    elif ('R' in bside_list):
        return 'R'
    else:
        return 'ERROR'

def get_bside_aggregation_dict(df: pd.DataFrame) -> dict[float, str]:
    # we only need to apply this to exam findings with no exam-level pathology registered
    path_na_mask: pd.Series[bool] = pd.isna(mag_contra_df.exam_path_severity)

    # or exam findings where the finding-level path severity matches the exam-level path severity
    path_match_mask: pd.Series[bool] = (
        ~pd.isna(mag_contra_df.exam_path_severity)
        & (mag_contra_df.path_severity == mag_contra_df.exam_path_severity)
    )

    # define a list of columns to consider
    col_list: list[str] = ['acc_anon', 'empi_anon', 'study_date_anon', 'exam_birads', 'exam_path_severity', 'bside']

    # get the relevant subset of the data, then 
    df_subset: pd.DataFrame = df.loc[path_na_mask | path_match_mask, col_list]

    # drop any duplicate rows, group by exam, then apply the agg func and output a [exam > bside] mapping dict
    bside_agg_dict: dict[float, str] = (
        df_subset
        .drop_duplicates()
        .groupby('acc_anon')
        .progress_apply(aggregate_bsides) # type: ignore
        .to_dict()
    )
    return bside_agg_dict

# apply the agg function and get a dict of exam mappings
bside_agg_dict: dict[float, str] = get_bside_aggregation_dict(mag_contra_df)

# map the agg dict back to the dataframe
mag_contra_df['exam_bside'] = mag_contra_df['acc_anon'].map(bside_agg_dict)
mag_contra_df['exam_bside'].value_counts(dropna=False)

100%|██████████| 678651/678651 [00:19<00:00, 35272.66it/s]


exam_bside
NaN    826027
L       33193
R       32518
B        1932
Name: count, dtype: int64

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0-6'></a>
## 0.6 Handle Addended v1 Exam Data
In some cases in EMBED v1, exams that were addended at some point in time have incomplete information in our data. This is one example where that's observable: after manually reviewing all of our completely-negative exams with a pathology result linked *directly* to a negative finding in an exam, we found they had all been addended to BIRADS A at a later date. We'll manually correct these to BIRADS A exams for now so they get handled correctly by the rest of the pipeline.

In [10]:
# correct exam birads to 'A' for any 'negative' exams with a pathology assigned directly to the screen findings
# a review of the associated rad. notes indicated these were all addended to 'A's
addended_exam_list = mag_contra_df[
    (mag_contra_df.screen_exam == True) 
    & mag_contra_df.exam_birads.isin(['N', 'B']) 
    & ~pd.isna(mag_contra_df.exam_path_severity)
].acc_anon.unique().tolist()

mag_contra_df.loc[mag_contra_df.acc_anon.isin(addended_exam_list), 'exam_birads'] = 'A'

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0-7'></a>
## 0.7 Assign Screen BIRADS Helper Variables
These are a shorthand for exams with a screening `exam_birads` of 'A' (abnormal) or 'N'/'B' (negative/benign)

In [11]:
scr_br_0_list = mag_contra_df[
    (mag_contra_df.screen_exam == True)
    & (mag_contra_df.exam_birads.isin(['A']))
].acc_anon.unique().tolist()

scr_br_12_list = mag_contra_df[
    (mag_contra_df.screen_exam == True)
    & (mag_contra_df.exam_birads.isin(['N', 'B']))
].acc_anon.unique().tolist()

mag_contra_df['scr_br_0'] = False
mag_contra_df.loc[mag_contra_df.acc_anon.isin(scr_br_0_list), 'scr_br_0'] = True

mag_contra_df['scr_br_12'] = False
mag_contra_df.loc[mag_contra_df.acc_anon.isin(scr_br_12_list), 'scr_br_12'] = True


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-0-8'></a>
## 0.8 Prepare Target Sample Subset

In this workflow, we'll identify a subset of our data which will be the *actual* target for label assignment. Splitting our data into two like this allows us to consider information from *all* exams when processing a subset (for example, we might only want to work with screening images but we still want to consider all events that occured for a patient). 

Here, we'll define a subset by merging in a dataframe with mappings between the exam ID column, `acc_anon`, and predicted malignancy scores from a model.

In [12]:
def prepare_score_data(data: pd.DataFrame) -> pd.DataFrame:
    # correct variable dtypes and rename columns
    rename_dict: dict[str, str] = {
        'accession number': 'acc_anon',
        'Cohort': 'cohort'
    }

    data.rename(columns=rename_dict, inplace=True)
    return data[['acc_anon', 'cohort', 'score']]


# load our score dataframe and prepare it
score_df: pd.DataFrame = pd.read_csv(SCORE_PATH)
score_df = prepare_score_data(score_df)

# ensure key columns have the correct data types
score_df['acc_anon'] = pd.to_numeric(score_df['acc_anon'])

In [13]:
# get the list of accessions used in the study then get the subset of magview corresponding to it
lunit_acc_list = score_df.acc_anon.tolist()
mag_sample_df = mag_contra_df[mag_contra_df.acc_anon.isin(lunit_acc_list)]

# we only want to consider screening exams in our target subset, so we'll drop any diagnostic cases present
mag_sample_df = mag_sample_df[mag_sample_df.desc.str.contains('screen', case=False)]

---

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-1'></a>
# 1. Handle Follow-Up Exam Data

<a id='section-1-1'></a>
## 1.1 Define Diagnostic/Ultrasound Dataframes

In [14]:
followup_cols = ['acc_anon', 'empi_anon', 'study_date_anon', 'exam_laterality', 'exam_birads', 'exam_path_severity', 'exam_bside']

In [15]:
# get subset of magview corresponding to diagnostic exams
mag_diag = mag_contra_df.loc[mag_contra_df.desc.str.contains('diag', case=False)]
mag_diag = mag_diag[followup_cols].drop_duplicates()

# ensure we have exactly 1 row for each exam
print('any duplicate exam rows?', mag_diag.acc_anon.nunique() != len(mag_diag))

any duplicate exam rows? False


In [16]:
# get subset of magview corresponding to ultrasound exams
mag_us = mag_contra_df.loc[mag_contra_df.desc.str.contains('US')]
mag_us = mag_us.dropna(subset="asses")
mag_us = mag_us[followup_cols].drop_duplicates()

# ensure we have exactly 1 row for each exam
print('any duplicate exam rows?', mag_us.acc_anon.nunique() != len(mag_us))

any duplicate exam rows? False


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-1-2'></a>
## 1.2 Get Follow-Up Mappings

In [17]:
def get_followup_map_dict(df: pd.DataFrame, followup_df: pd.DataFrame, time_delta: Union[int, float] = 180):
    # don't consider followups with an undefined exam_birads (indicates an invalid birads for that stage)
    # followup_df = followup_df[(followup_df.exam_birads != '') & ~pd.isna(followup_df.exam_birads)]
    
    # time delta in days
    # expects df to have been corrected for contralateral findings (and for no NA finding sides to exist)
    # previous versions assumed no 'B' findings but this does not
    merge_df = df.merge(followup_df, on='empi_anon', how='inner', suffixes=(None, "_fu"))
    
    # ensure exam laterality match, L==L, R==R, or either original/followup is bilateral
    merge_df = merge_df.loc[
        (merge_df.exam_laterality==merge_df.exam_laterality_fu)
        | (merge_df.exam_laterality=="B")
        | (merge_df.exam_laterality_fu=="B")
    ]

    # exclude followups with an invalid time delta
    merge_df["fu_delta"] = (merge_df.study_date_anon_fu - merge_df.study_date_anon).dt.days
    merge_df = merge_df.loc[(merge_df.fu_delta >= 0) & (merge_df.fu_delta <= time_delta)]

    # get the accession of the first valid followup for each exam and output a dict of mappings
    map_dict = merge_df.sort_values('fu_delta').drop_duplicates('acc_anon', keep='first').set_index('acc_anon')['acc_anon_fu'].to_dict()
    return map_dict
    

#### BIRADS 0 Exams

In [18]:
# mag_br0 = mag_contra_df[mag_contra_df.scr_br_0 == True]
mag_br0 = mag_sample_df[mag_sample_df.scr_br_0 == True]

# get birads 0 diagnostic followup map dict
br0_dx_map_dict = get_followup_map_dict(mag_br0, mag_diag, time_delta=180)
print(f"{len(br0_dx_map_dict)} valid DX followups found for Screen BIRADS 0s")

# get birads 0 ultrasound followup map dict
br0_us_map_dict = get_followup_map_dict(mag_br0, mag_us, time_delta=180)
print(f"{len(br0_us_map_dict)} valid US followups found for Screen BIRADS 0s")

18281 valid DX followups found for Screen BIRADS 0s
3454 valid US followups found for Screen BIRADS 0s


#### BIRADS 1/2 Exams

In [19]:
# mag_br12 = mag_contra_df[mag_contra_df.scr_br_12 == True]
mag_br12 = mag_sample_df[mag_sample_df.scr_br_12 == True]

# get birads 0 diagnostic followup map dict
br12_dx_map_dict = get_followup_map_dict(mag_br12, mag_diag, time_delta=365)
print(f"{len(br12_dx_map_dict)} valid DX followups found for Screen BIRADS 1/2s")

# get birads 0 ultrasound followup map dict
br12_us_map_dict = get_followup_map_dict(mag_br12, mag_us, time_delta=365)
print(f"{len(br12_us_map_dict)} valid US followups found for Screen BIRADS 1/2s")

2467 valid DX followups found for Screen BIRADS 1/2s
416 valid US followups found for Screen BIRADS 1/2s


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-1-3'></a>
## 1.3 Perform Follow-Up Mapping

#### Diagnostic Follow-Ups

In [20]:
# combine both dx map dicts and use it to derive a 'earliest_dx_acc' column in mag_sample_df
# which contains any valid followup dx acc and exam-level dx birads/path severity
mag_sample_df["earliest_dx_acc"] = mag_sample_df["acc_anon"].map({**br0_dx_map_dict, **br12_dx_map_dict})

mag_sample_df = mag_sample_df.merge(
    mag_diag[["acc_anon", "exam_birads", "exam_path_severity", "exam_bside"]],
    how="left",
    left_on=["earliest_dx_acc"],
    right_on=["acc_anon"],
    suffixes=(None, "_dx")
)


#### Ultrasound Follow-Ups

In [21]:
# combine both dx map dicts and use it to derive a 'earliest_dx_acc' column in mag_sample_df
# which contains any valid followup dx acc and exam-level dx birads/path severity
mag_sample_df["earliest_us_acc"] = mag_sample_df["acc_anon"].map({**br0_us_map_dict, **br12_us_map_dict})

mag_sample_df = mag_sample_df.merge(
    mag_us[["acc_anon", "exam_birads", "exam_path_severity", "exam_bside"]].drop_duplicates(),
    how="left",
    left_on=["earliest_us_acc"],
    right_on=["acc_anon"],
    suffixes=(None, "_us")
)


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-1-4'></a>
## 1.4 Summarize Follow-Ups
Prioritize results on diagnostic follow-ups if they exist (and are more severe than existing valid ultrasound follow-ups). Otherwise, use any valid ultrasound follow-up results.

In [22]:
mag_sample_df["followup_type"] = ""

# present dx (prioritized if both dx and us present)
mag_sample_df.loc[
    ~pd.isna(mag_sample_df.earliest_dx_acc), "followup_type"
] = "DX"

# missing dx, present us
mag_sample_df.loc[
    pd.isna(mag_sample_df.earliest_dx_acc) 
    & ~pd.isna(mag_sample_df.earliest_us_acc), "followup_type"
] = "US"

# present dx + us, us path_severity more severe so it overrides dx
mag_sample_df.loc[
    ~pd.isna(mag_sample_df.earliest_dx_acc) 
    & ~pd.isna(mag_sample_df.earliest_us_acc)
    & (mag_sample_df.exam_path_severity_us < mag_sample_df.exam_path_severity_dx), "followup_type"
] = "US"


In [23]:
mag_sample_df["followup_path_severity"] = np.nan
mag_sample_df.loc[mag_sample_df.followup_type == "DX", "followup_path_severity"] = mag_sample_df.loc[mag_sample_df.followup_type == "DX", "exam_path_severity_dx"]
mag_sample_df.loc[mag_sample_df.followup_type == "US", "followup_path_severity"] = mag_sample_df.loc[mag_sample_df.followup_type == "US", "exam_path_severity_us"]

mag_sample_df["followup_bside"] = ""
mag_sample_df.loc[mag_sample_df.followup_type == "DX", "followup_bside"] = mag_sample_df.loc[mag_sample_df.followup_type == "DX", "exam_bside_dx"]
mag_sample_df.loc[mag_sample_df.followup_type == "US", "followup_bside"] = mag_sample_df.loc[mag_sample_df.followup_type == "US", "exam_bside_us"]

mag_sample_df["followup_birads"] = ""
mag_sample_df.loc[mag_sample_df.followup_type == "DX", "followup_birads"] = mag_sample_df.loc[mag_sample_df.followup_type == "DX", "exam_birads_dx"]
mag_sample_df.loc[mag_sample_df.followup_type == "US", "followup_birads"] = mag_sample_df.loc[mag_sample_df.followup_type == "US", "exam_birads_us"]

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-1-5'></a>
## 1.5 Assign Follow-Up Helper Variables

#### BIRADS 1/2/3

In [24]:
mag_sample_df['dx_br_123'] = ''
mag_sample_df.loc[(mag_sample_df.scr_br_0 == True), 'dx_br_123'] = False
mag_sample_df.loc[(mag_sample_df.scr_br_0 == True) & (mag_sample_df.followup_birads.isin(["N", "B", "P"])), 'dx_br_123'] = True

#### Pathology

In [25]:
for col_name, path_levels in zip(['ps_01', 'ps_234', 'ps_5'], [[0.0, 1.0], [2.0, 3.0, 4.0], [5.0]]):
    mag_sample_df[col_name] = ''
    mag_sample_df.loc[(mag_sample_df.scr_br_0 == True), col_name] = False
    mag_sample_df.loc[(mag_sample_df.scr_br_0 == True) & (mag_sample_df.exam_path_severity.isin(path_levels) | mag_sample_df.followup_path_severity.isin(path_levels)), col_name] = True

mag_sample_df.loc[mag_sample_df.ps_01 == True, ["ps_234", "ps_5"]] = False
mag_sample_df.loc[mag_sample_df.ps_234 == True, ["ps_5"]] = False


<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-1-6'></a>
## 1.6 Identify Interval Cancers

In [26]:
# only consider scr br1/2 exams. 
# allow bilateral exams with dx path severity in [0.0, 1.0]
# allow unilateral exams with dx path severity in [0.0, 1.0] IF the exam laterality and biopsy side match
bilat_interval_mask = (mag_sample_df.scr_br_12 == True) & (mag_sample_df.exam_laterality == "B") & mag_sample_df.followup_path_severity.isin([0.0, 1.0])
unilat_interval_mask = (mag_sample_df.scr_br_12 == True) & (mag_sample_df.exam_laterality != "B") & mag_sample_df.followup_path_severity.isin([0.0, 1.0]) & (mag_sample_df.exam_laterality == mag_sample_df.followup_bside)

mag_sample_df["interval_cancer"] = ''
mag_sample_df.loc[(mag_sample_df.scr_br_12 == True), "interval_cancer"] = False
mag_sample_df.loc[bilat_interval_mask | unilat_interval_mask, "interval_cancer"] = True

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-1-7'></a>
## 1.7 Evaluate Long-Term Follow-Up Status

In [27]:
def verify_condition(df, acc_list, span_list, date_col: str = 'study_date_anon'):
    df[date_col] = pd.to_datetime(df[date_col])
    mag_contra_df['empi_anon'] = pd.to_numeric(mag_contra_df['empi_anon'])
    mag_contra_df['acc_anon'] = pd.to_numeric(mag_contra_df['acc_anon'])
    condition_dict = dict()
    
    # # get a list of all qualifying conditions to do the first pass elim
    # qual_list = [span['qualifying'] for span in span_list if span['qualifying'] != ""]

    # # concatenate all qual strings into an or condition for an initial pass
    # init_qual_str = " | ".join([f"({qual_str})" for qual_str in qual_list])
    df_acc_list = df.acc_anon.unique().tolist()
    acc_list = list(set(acc_list).intersection(set(df_acc_list)))
    
    # iterate over exams
    for target_acc in tqdm(acc_list):
        # init bool to track whether the acc has been accepted/rejected
        acc_is_valid = True
        
        # is there a faster way to do this?
        acc_mask = df.acc_anon == target_acc
        date_i = df.loc[acc_mask, date_col].mode()[0]
        empi = df.loc[acc_mask, "empi_anon"].mode()[0]

        patient_df = df[df.empi_anon == empi]

        # iterate over spans
        for span in span_list:
            # determine span parameters
            span_length = span['length'] # throw an error if no span length was given
            span_qual = span.get('qualifying', '')
            span_disqual = span.get('disqualifying', '')
            span_n = span.get('required_n', 0 if not span_qual else 1) # default to 1 if there's a qualifying condition

            # get span end date
            date_f = (date_i + pd.Timedelta(days=span_length))

            # find subset of patient df between the span start/end dates
            span_df = patient_df[
                (patient_df[date_col] >= date_i)
                & (patient_df[date_col] <= date_f)
            ]

            # evaluate qual condition if it exists, else keep span_df
            span_df = span_df.query(span_qual) if span_qual else span_df
            
            # if span_df < n cases, reject the acc
            if span_df.acc_anon.nunique() < span_n:
                acc_is_valid = False
                break
                
            # otherwise if we have a qual condition w/ sufficient exams, + a disqual condition to eval
            elif span_qual and span_disqual:
                # get the study date of the nearest valid exam, and filter span_df to exclude cases later than it
                # so we only check the disqual condition up to this point
                new_date_f = span_df[date_col].sort_values().unique().tolist()[0]

                # reformat the span_disqual string to include the new cutoff date condition
                span_disqual += f" & ({date_col} <= '{new_date_f}')"
                

            if span_disqual:
                # if there are any disqualifying cases, reject the acc
                if span_df.query(span_disqual).acc_anon.nunique():
                    acc_is_valid = False
                    break
            
            # increment date_0 before evaluating the next span
            # we only need to do this if the last span passes
            date_i = (date_f + pd.Timedelta(days=1))

        # if all spans pass, accept the acc
        if acc_is_valid:
            condition_dict[target_acc] = True
        else:
            condition_dict[target_acc] = False

    # return the condition dict after evaluating all exams
    return condition_dict


In [28]:
# only evaluate followup status for negative exams
scr_neg_acc_list = mag_sample_df.loc[(mag_sample_df.scr_br_12 == True) & (mag_sample_df.interval_cancer == False), 'acc_anon'].unique().tolist()
print('screen negatives accs:', len(scr_neg_acc_list))

neg_acc_list = mag_sample_df.loc[~((mag_sample_df.scr_br_0 == True) & (mag_sample_df.ps_01 == True)), 'acc_anon'].unique().tolist()
print('all negatives accs:', len(neg_acc_list))

screen negatives accs: 171422
all negatives accs: 201547


In [29]:
span_list = [
    { # the first span has no conditions
        'length': 365,
        'qualifying': "",
        'disqualifying': "",
        'required_n': 0
    },
    { # the second span accepts accs with >=1 of any exam type
        'length': 365*3,
        'qualifying': "",
        'disqualifying': "",
        'required_n': 1 # required_n applies to the qualifying condition, must be >1 exams of any kind during this period
    },
]

any_followup_dict = verify_condition(mag_contra_df, scr_neg_acc_list, span_list)

mag_sample_df['any_followup'] = mag_sample_df.acc_anon.map(any_followup_dict)

100%|██████████| 171422/171422 [06:13<00:00, 458.66it/s]


---

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-2'></a>
# 2. Data Finalization

<a id='section-2-1'></a>
## 2.1 Data Enrichment

In [30]:
embed_params = embed_toolkit.EMBEDParameters()


#### Major Exam Class Labels

In [31]:
mag_sample_df["label"] = "INVALID"
mag_sample_df["label"] = mag_sample_df["label"].case_when([
    (mag_sample_df.eval("(scr_br_12 == True) & (interval_cancer == False)"), "Screen Negative"), 
    (mag_sample_df.eval("(scr_br_12 == True) & (interval_cancer == True)"), "Interval Cancer"),
    (mag_sample_df.eval("(scr_br_0 == True) & (ps_01 == True)"), "Screen Detected Cancer"),
    (mag_sample_df.eval("(scr_br_0 == True) & (ps_234 == True)"), "Biopsy Proven Benign"),
    (mag_sample_df.eval("(scr_br_0 == True) & (dx_br_123 == True)"), "Diagnostic Negative"),
    (mag_sample_df.eval("(scr_br_0 == True) & (ps_5 == True)"), "Other Cancer"),
])  # type: ignore

#### Finding Characteristics
First, we'll derive characteristics for all findings in the sample. Then we'll aggregate these upwards so exam-level characteristics only consider abnormal findings in abnormal exams, and negative/benign findings in negative/benign exams.

In [32]:
# derive findings-level characteristics
mag_sample_df[['mass', 'asymmetry', 'arch_distortion', 'calcification']] = mag_sample_df.progress_apply(
    embed_params.extract_characteristics, 
    axis='columns', 
    result_type='expand'
) # type: ignore

# finding characteristics should only be present on benign/abnormal findings
mag_sample_df.loc[mag_sample_df.asses == "N", ['mass', 'asymmetry', 'arch_distortion', 'calcification']] = 0

100%|██████████| 249675/249675 [00:05<00:00, 45074.05it/s] 


In [33]:
# get all exams with a birads A finding and filter them so they only have birads A findings
br0_exams = mag_sample_df[mag_sample_df.exam_birads.isin(["A"]) & mag_sample_df.asses.isin(["A"])]
br12_exams = mag_sample_df[mag_sample_df.exam_birads.isin(["N", "B"])]

# generalize to exam-level
br0_exam_chars_dict = br0_exams.groupby("acc_anon").progress_apply(embed_params.aggregate_characteristics).to_dict() # type: ignore
br12_exam_chars_dict = br12_exams.groupby("acc_anon").progress_apply(embed_params.aggregate_characteristics).to_dict() # type: ignore

100%|██████████| 31346/31346 [00:02<00:00, 14385.56it/s]
100%|██████████| 171527/171527 [00:12<00:00, 13889.56it/s]


In [34]:
for char_var in ['exam_mass', 'exam_asymmetry', 'exam_arch_distortion', 'exam_calcification']:
    br12_char_dict = {k:v[char_var] for k,v in br12_exam_chars_dict.items()}
    br0_char_dict = {k:v[char_var] for k,v in br0_exam_chars_dict.items()}

    mag_sample_df[char_var] = mag_sample_df.acc_anon.map({**br12_char_dict, **br0_char_dict})
    

In [35]:
# we need to handle some edge cases in the data that left NaNs in the exam-level characteristics

# nan observations are present on cases with invalid BIRADS, and those with adjusted BIRADS N/B 
# exams that were corrected to BIRADS A exams after inspection for addendums
# we'll manually derive the exam-level characteristics for the latter group and set others to False
addended_exam_chars_dict = (
    mag_sample_df[
        pd.isna(mag_sample_df.exam_mass) 
        & (mag_sample_df.exam_birads == 'A')
    ]
    .groupby("acc_anon")
    .progress_apply(embed_params.aggregate_characteristics) # type: ignore
    .to_dict()
)

# map our addended exam characteristics back to the data
for char_var in ['exam_mass', 'exam_asymmetry', 'exam_arch_distortion', 'exam_calcification']:
    br12_char_dict = {k:v[char_var] for k,v in br12_exam_chars_dict.items()}
    br0_char_dict = {k:v[char_var] for k,v in br0_exam_chars_dict.items()}
    addended_char_dict = {k:v[char_var] for k,v in addended_exam_chars_dict.items()}

    mag_sample_df[char_var] = mag_sample_df.acc_anon.map({**addended_char_dict, **br12_char_dict, **br0_char_dict})


100%|██████████| 33/33 [00:00<00:00, 5986.42it/s]




In [36]:
# now the only exams with any missing values have invalid BIRADS and will 
# be dropped, so they can be considered false
mag_sample_df = mag_sample_df.fillna(
    {
        'exam_mass': False, 
        'exam_asymmetry': False, 
        'exam_arch_distortion': False, 
        'exam_calcification': False
    }
)


#### Screen Detected Pathology

In [37]:
mag_sample_df["scr_detected_path"] = "No Pathology"
mag_sample_df.loc[mag_sample_df.scr_br_0 == True, "scr_detected_path"] = mag_sample_df.loc[mag_sample_df.scr_br_0 == True, "scr_detected_path"].case_when([
    (mag_sample_df.eval("(exam_path_severity == 0.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 0.0))"), "Invasive Cancer"), 
    (mag_sample_df.eval("(exam_path_severity == 1.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 1.0))"), "Noninvasive Cancer"),
    (mag_sample_df.eval("(exam_path_severity == 2.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 2.0))"), "High Risk Lesion"),
    (mag_sample_df.eval("(exam_path_severity == 3.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 3.0))"), "Borderline Lesion"),
    (mag_sample_df.eval("(exam_path_severity == 4.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 4.0))"), "Benign Lesion"),
    (mag_sample_df.eval("(exam_path_severity == 5.0) | ((exam_path_severity.isna() | (followup_path_severity < exam_path_severity)) & (followup_path_severity == 5.0))"), "Other Cancer"),
]) # type: ignore

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-2-2'></a>
## 2.2 Data Cleaning

#### Patient Race

In [38]:
# standardize whitespace
mag_sample_df['ETHNICITY_DESC'] = mag_sample_df['ETHNICITY_DESC'].str.strip()
mag_sample_df['ETHNICITY_DESC'] = mag_sample_df['ETHNICITY_DESC'].str.replace(r'\s+', ' ', regex=True)

mag_sample_df["race"] = "Other"
mag_sample_df["race"] = mag_sample_df["race"].case_when([
    (mag_sample_df.eval("ETHNICITY_DESC == 'African American or Black'"), "Black"),
    (mag_sample_df.eval("ETHNICITY_DESC == 'Caucasian or White'"), "White"),
    (mag_sample_df.eval("ETHNICITY_DESC == 'Asian'"), "Asian"),
    (mag_sample_df.eval("ETHNICITY_DESC == 'Unknown, Unavailable or Unreported'"), "Unknown"),
    (mag_sample_df.eval("ETHNICITY_DESC.isna()"), "Unknown")
]) # type: ignore


#### Patient Ethnicity

In [39]:
# standardize whitespace
mag_sample_df['ETHNIC_GROUP_DESC'] = mag_sample_df['ETHNIC_GROUP_DESC'].str.strip()
mag_sample_df['ETHNIC_GROUP_DESC'] = mag_sample_df['ETHNIC_GROUP_DESC'].str.replace(r'\s+', ' ', regex=True)

hispanic_list = ["Hispanic or Latino", "Unknown~Hispanic"]
not_hispanic_list = ["Non-Hispanic or Latino", "Non-Hispanic~Unknown", "Unknown~Non-Hispanic"]

mag_sample_df["ethnicity"] = "Unknown"
mag_sample_df["ethnicity"] = mag_sample_df["ethnicity"].case_when([
    (mag_sample_df.eval("ETHNIC_GROUP_DESC.isin(@hispanic_list)"), "Hispanic or Latino"),
    (mag_sample_df.eval("ETHNIC_GROUP_DESC.isin(@not_hispanic_list)"), "Not Hispanic or Latino"),
]) # type: ignore


#### Patient Age

In [40]:
mag_sample_df['age_binned'] = pd.cut(mag_sample_df['age_at_study'], bins=[0, 50, 75, 120], labels=["<50", "50-75", ">=75"])
mag_sample_df.drop_duplicates('acc_anon')['age_binned'].value_counts(dropna=False)

age_binned
50-75    133966
<50       49988
>=75      18969
Name: count, dtype: int64

<a href='#contents'>
    <button style='margin-top: 1rem; padding: 0.5rem; cursor: pointer;'>Back to Top</button>
</a>

<a id='section-2-3'></a>
## 2.3 Finalize and Output Data

In [41]:
# ensure tissueden is numeric
mag_sample_df['tissueden'] = pd.to_numeric(mag_sample_df['tissueden'], errors="coerce")

# get the accessions of any exams with invalid screening-stage BIRADS
dx_only_asses = ["P", "S", "M", "K"]
invalid_exam_list = (
    mag_sample_df.loc[
        mag_sample_df.desc.str.contains("screen", case=False) 
        & mag_sample_df.asses.isin(dx_only_asses), 
        'acc_anon'
    ]
    .unique()
    .tolist()
)

In [42]:
# drop any cases where tissueden is 5.0, with no valid major label, or with other cancers present
final_df = mag_sample_df[
    ~((mag_sample_df.tissueden == 5.0)
      | (mag_sample_df.acc_anon.isin(invalid_exam_list))
      | (mag_sample_df.label == "INVALID") 
      | (mag_sample_df.label == "Other Cancer") 
      | (mag_sample_df.scr_detected_path == "Other Cancer")
      | (mag_sample_df.any_followup == False))
]


In [43]:
if OUTPUT_PATH is not None:
    final_df.to_csv(OUTPUT_PATH, index=False)
    print(f"data saved to: '{OUTPUT_PATH}'")