<a href="https://colab.research.google.com/github/clv07/stroke-of-luck/blob/Data-import/MI_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from os import read
import csv
import pandas
import numpy as np

# Create Code Dictionary:
MI_stmnts_12SL = [740,810,820, ## anterior infarct
                  700,810, ## septal infarct
                  760,820, ## lateral infarct
                  780,801,806, ## inferior infarct
                  801,802,803, ## posterior infarct
                  826,827,963,964,965,966,967,968, ## infarct - ST elevation
                  4,821,822,823,826,827,828,829,920,930,940,950,960,961,962,1361, ## acute MI or injury
                  ]
MI_stmnts_12SL = set(MI_stmnts_12SL)
#Read in Results.csv

#Read in 12SL



# Basic Error Rate for MI
# - Look for physician codes that includes:
# -
# Acute MI (STEMI)
# Bayes Theorem
# False Positives

In [11]:
import pandas as pd

###############################################################################
# STEP 1: READ AND PROCESS EACH 12SL CSV
###############################################################################
def read_and_process_12sl(csv_path, dataset_name=None):
    """
    Reads a 12SL CSV file and returns a DataFrame with:
      - 'TestID' (the patient identifier)
      - 'Statement_codes' (a comma-separated string of codes)
      - A 'Source' column to indicate which dataset it came from

    """
    df_raw = pd.read_csv(csv_path)

    # Rename the first column to "TestID"
    first_col = df_raw.columns[0]
    df_raw.rename(columns={first_col: "TestID"}, inplace=True)

    # for results.csv -> only 2 columns
    if len(df_raw.columns) == 2:
        # convert | to commas (,)
        df_raw["Statement_codes"] = (
            df_raw["Statements"]
            .astype(str)
            .str.strip("|")
            .str.replace("|", ",")
        )
    else:
        # for the 12SL output (mulitple columns for codes)
        code_cols = df_raw.columns[1:]
        def combine_codes(row):
            valid = row.dropna().astype(str)
            return ",".join(valid)
        df_raw["Statement_codes"] = df_raw[code_cols].apply(combine_codes, axis=1)

    # Keep only TestID and Statement_codes (we no longer need the original "Statements" or other columns)
    df_processed = df_raw[["TestID", "Statement_codes"]].copy()

    # Standardize patient IDs if needed.
    # For example, in your Shaoxing_12SL sample, TestID is a number (e.g., 1) but in results.csv
    # the patient IDs are like "JS00001". Here we convert if dataset_name=="Shaoxing".
    if dataset_name == "Shaoxing":
        df_processed["TestID"] = df_processed["TestID"].apply(lambda x: f"JS{int(x):05d}")
    # If you have similar rules for other datasets, add them here.

    # Optionally add a column to record the source dataset.
    if dataset_name:
        df_processed["Source"] = dataset_name

    return df_processed

# Example file paths (update these paths as needed)
ptbxl_csv    = r"./PTBXL_12SL.csv"      # adjust if format differs
shaoxing_csv = r"./Shaoxing_12SL.csv"   # sample provided above
cpsc_csv     = r"./CPSC2018_12SL.csv"     # adjust if format differs

# Process each CSV.
# (If PTBXL or CPSC files are in a wide format and already use the desired ID format,
#  you might not need to convert their IDs.)
df_ptbxl    = read_and_process_12sl(ptbxl_csv, dataset_name="PTBXL")
df_shaoxing = read_and_process_12sl(shaoxing_csv, dataset_name="Shaoxing")
df_cpsc     = read_and_process_12sl(cpsc_csv, dataset_name="CPSC2018")

# Concatenate all 12SL data.
df_12sl_combined = pd.concat([df_ptbxl, df_shaoxing, df_cpsc], ignore_index=True)

###############################################################################
# STEP 2: READ AND PREPARE THE results.csv (PHYSICIAN CODES)
###############################################################################
results_csv = r"./results.csv"
df_results = pd.read_csv(results_csv)

# The results.csv sample has headers: patient_num and codes.
# Rename these to "TestID" and "Statements_Phys" so that we can merge.
df_results.rename(columns={"patient_num": "TestID", "codes": "Statements_Phys"}, inplace=True)

# It may be necessary to strip any extra spaces from the codes:
df_results["Statements_Phys"] = df_results["Statements_Phys"].astype(str).str.strip()

###############################################################################
# STEP 3: MERGE THE 12SL DATA WITH THE PHYSICIAN DATA
###############################################################################
# Merge on the patient ID. Use inner join if you want only matching IDs.
df_merged = pd.merge(df_12sl_combined, df_results, on="TestID", how="inner")

###############################################################################
# STEP 4: DEFINE MI CODE SETS AND CREATE BINARY INDICATORS
###############################################################################
# Example MI code sets (update these with the actual codes you consider for MI)
MI_codes_12SL = {
    700, 740, 760, 780, 801, 810, 820, 826, 827, 828, 829,
    920, 930, 940, 950, 960, 961, 962, 963, 964, 965, 966, 967, 968,
    # ... add others as needed
}

MI_codes_Phys = {
    57054005, 413444003, 426434006, 54329005, 425419005,
    425623009, 164865005, 164861001,
    # ... add others as needed
}
# Pairs matching codes from different systems to common condition, formatted
# "Conditon" : [num_one, num_two ...] , use to identify shared diagnoses
# between physicians and 12SL
MI_code_mapping = {
    "anterior infarct": [740, 810, 820, 54329005],
    "septal infarct": [700, 810],
    "lateral infarct": [760, 820, 425623009],
    "inferior infarct": [780, 801, 806, 425419005],
    "posterior infarct": [801, 802, 803],
    "infarct - ST elevation": [826, 827, 963, 964, 965, 966, 967, 968],
    "acute MI or injury": [4, 821, 822, 823, 826, 827, 828, 829, 920, 930, 940, 950, 960, 961, 962, 1361, 57054005, 413444003, 426434006, 164865005, 164861001]
}

def flag_mi(codes_str, mi_set):
    """
    Given a string of codes separated by commas (possibly with spaces),
    returns 1 if any code (converted to int) is in the mi_set, else 0.
    """
    if pd.isna(codes_str) or codes_str.strip() == "":
        return 0
    codes = [c.strip() for c in codes_str.split(",") if c.strip()]
    for code in codes:
        try:
            if int(code) in mi_set:
                return 1
        except ValueError:
            continue
    return 0

def GivenXInspection(df, codes_str, mi_column, mi_code_mapping):
    """
    Given a Merged DF with 1) Patient Labels, 2) 12SL Codes, 3) Physician Codes,
    4) MI_12SL, 5) MI_Phys, determine the probability of 12SL correctly identifying
    a symptom given a set of codes (i.e., Probability of 12SL detecting MI given ST Elevation).

    Args:
        df (pd.DataFrame): The merged dataframe containing MI flags.
        codes_str (str): A condition category representing the "given" signal (e.g., "anterior infarct").
        mi_column (str): The column name representing the 12SL calculated Condition flag (e.g., "MI_12SL").
        mi_code_mapping (dict): A dictionary mapping condition categories to a list of associated codes.

    Returns:
        float: Probability of correct identification.
    """
    # Get the relevant codes for the given condition
    condition_codes = set(mi_code_mapping.get(codes_str, []))

    # Convert Statements_Phys to sets for efficient lookup
    df["Statements_Phys_Set"] = df["Statements_Phys"].apply(lambda x: set(map(int, x.split(','))) if pd.notna(x) else set())

    # Identify cases where at least one given condition code is present in the physician's statements
    signal_present = df[df["Statements_Phys_Set"].apply(lambda codes: not condition_codes.isdisjoint(codes))]

    # Further filter cases where the physician also flagged MI
    mi_positive_patients = signal_present.loc[signal_present["MI_Phys"] == 1, "TestID"]

    if mi_positive_patients.empty:
        return 0  # Avoid division by zero if no cases exist

    # Filter 12SL-flagged cases only for identified patients
    correct_identifications = df.loc[df["TestID"].isin(mi_positive_patients), mi_column].sum()

    # Calculate probability
    probability = correct_identifications / len(mi_positive_patients)

    return probability

def LabelMapping(df_merged, mi_code_mapping):
    """
    Given a Merged DF with 1) Patient Labels, 2) 12SL Codes, 3) Physician Codes,
    4) MI_12SL, 5) MI_Phys, break down the identified signs by 12SL & Physicians
    to categorize label percentages in missed and false flag cases.

    Args:
        df_merged (pd.DataFrame): The merged dataframe containing MI-related flags.
        mi_code_mapping (dict): A dictionary mapping condition categories to lists of associated codes.

    Returns:
        tuple: Two DataFrames -
            - df_code_percentages: Breakdown of missed/false flag percentages per code.
            - df_signal_percentages: Breakdown of missed/false flag percentages per condition category.
    """

    # Gather cases flagged by physicians but missed by 12SL
    df_missed_by_12sl = df_merged[(df_merged["MI_Phys"] == 1) & (df_merged["MI_12SL"] == 0)]

    # Gather cases flagged by 12SL but not confirmed by physicians
    df_false_id = df_merged[(df_merged["MI_12SL"] == 1) & (df_merged["MI_Phys"] == 0)]

    # Function to count occurrences of each code
    def count_codes(df):
        code_counts = {}
        for codes in df["Statements_Phys"].dropna():
            for code in map(int, codes.split(',')):  # Convert to int to ensure consistency
                code_counts[code] = code_counts.get(code, 0) + 1
        return code_counts

    # Count occurrences in missed and false flag cases
    missed_code_counts = count_codes(df_missed_by_12sl)
    false_code_counts = count_codes(df_false_id)

    # Convert counts to DataFrames
    df_code_percentages = pd.DataFrame([
        {"Code": code,
         "Missed_Percentage": missed_code_counts.get(code, 0) / len(df_missed_by_12sl) * 100 if len(df_missed_by_12sl) > 0 else 0,
         "False_Flag_Percentage": false_code_counts.get(code, 0) / len(df_false_id) * 100 if len(df_false_id) > 0 else 0}
        for code in set(missed_code_counts) | set(false_code_counts)  # Include all unique codes
    ])

    # Aggregate by signal type (condition category)
    signal_counts = {}
    for condition, codes in mi_code_mapping.items():
        signal_counts[condition] = {
            "Missed_Percentage": sum(missed_code_counts.get(code, 0) for code in codes) / len(df_missed_by_12sl) * 100 if len(df_missed_by_12sl) > 0 else 0,
            "False_Flag_Percentage": sum(false_code_counts.get(code, 0) for code in codes) / len(df_false_id) * 100 if len(df_false_id) > 0 else 0
        }

    df_signal_percentages = pd.DataFrame.from_dict(signal_counts, orient="index").reset_index().rename(columns={"index": "Condition"})

    return df_code_percentages, df_signal_percentages
# Create MI flags for 12SL and Physician.
df_merged["MI_12SL"] = df_merged["Statement_codes"].apply(lambda x: flag_mi(x, MI_codes_12SL))
df_merged["MI_Phys"]  = df_merged["Statements_Phys"].apply(lambda x: flag_mi(x, MI_codes_Phys))

###############################################################################
# STEP 5: INSPECTION / ANALYSIS
###############################################################################
print("Merged Data Sample:")
print(df_merged.head(10))
print(df_merged.columns)
print(f"Total merged records: {len(df_merged)}")
print(f"Total MI flagged by 12SL: {df_merged['MI_12SL'].sum()}")
print(f"Total MI flagged by Physician: {df_merged['MI_Phys'].sum()}")

# For example, cases flagged by Physician but not by 12SL (False Negative):
df_missed_by_12sl = df_merged[(df_merged["MI_Phys"] == 1) & (df_merged["MI_12SL"] == 0)]
print(f"Records where Physician flagged MI but 12SL did not: {len(df_missed_by_12sl)}")

# Case 2) Cases Flagged by 12SL but not by Physician (False Positive)
df_false_id = df_merged[(df_merged["MI_12SL"] == 1) & (df_merged["MI_Phys"] == 0)]
print(f"Records where 12SL flagged MI but Physician did not: {len(df_false_id)}")

# Case 3)
# a) Given STEMI, What is the chance that 12SL calculated it correctly
prob_given_STElevation = GivenXInspection(df_merged,"infarct - ST elevation","MI_12SL", MI_code_mapping)
print(f"Probability of 12SL identifying MI given ST Elevation: {prob_given_STElevation}")
# b) Given MI, What is the chance that 12SL calculated it correctly
prob_given_lateral = GivenXInspection(df_merged,"acute MI or injury","MI_12SL", MI_code_mapping)
print(f"Probability of 12SL identifying MI given MI or Injury: {prob_given_lateral}")
# Conditional Breakdown of Missed regions
breakdown = LabelMapping(df_merged, MI_code_mapping)
print(breakdown[0].head(10))
print(breakdown[1].head(10))
# Optionally, export the final merged data:
df_merged.to_csv("merged_output.csv", index=False)


Merged Data Sample:
    TestID                 Statement_codes    Source  \
0  JS00001        161,171,440,700,831,1699  Shaoxing   
1  JS00002                    21,1140,1699  Shaoxing   
2  JS00004  23,542,1665,533,1666,1141,1699  Shaoxing   
3  JS00005                         21,1687  Shaoxing   
4  JS00006                   161,1140,1699  Shaoxing   
5  JS00007                         22,1684  Shaoxing   
6  JS00008                         21,1687  Shaoxing   
7  JS00009                         21,1687  Shaoxing   
8  JS00010       21,542,1665,531,1666,1693  Shaoxing   
9  JS00011                         23,1687  Shaoxing   

                   Statements_Phys  MI_12SL  MI_Phys  
0   164889003, 59118001, 164934002        1        0  
1             426177001, 164934002        0        0  
2                        426177001        0        0  
3  164890007, 429622005, 428750005        0        0  
4                        426177001        0        0  
5             164889003, 16493400