In [None]:
# Generate data required for Hidden Diagnosis of AF study. 
# Involves: identifying qualifying cohort of patients and processing the relevant ECGs.
import os
import sys
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import ipyparallel as ipp

# Custom modules
from paths import map_params_to_filename

# Add src directory to path
sys.path.append('../src/')

# Start cluster
n_engines = 8
cluster = ipp.Cluster(n=n_engines)
cluster.start_cluster_sync()
rc = cluster.connect_client_sync()
rc.wait_for_engines(n_engines)
dview = rc[:]

# patient_mrn_to_file.csv is an output of the beginning of step02_process_muse_cache.ipynb
# and reflects a map of patient IDs to filenames for ECGs between 2016 and 2019
# the use of "mrn" in the filename is a relic from our initial approach of linking 
# patients between MUSE and the EDW by using the MRN. 
# This is no longer used in the current approach, which links patients based on first name, last name, and date of birth.
muse_cache_df = pd.read_csv('../outputs_intermediate/patient_mrn_to_file.csv', dtype='str')

afib_preprocessing_config = {'max_pred_gap': 90, 'selection_criteria': 'va', 'include_single_ecgs': True, 'mini': False}
unique_id_col = 'UniqueID'

np.random.seed(0)
if afib_preprocessing_config['mini']:
    patient_id_subsample = np.random.choice(muse_cache_df[unique_id_col].values, 20000)
    muse_cache_df = muse_cache_df[muse_cache_df[unique_id_col].isin(patient_id_subsample)]
muse_cache_df['date'] = pd.to_datetime(muse_cache_df[['year', 'month', 'day']])
muse_cache_df['afib'] = muse_cache_df['afib'].astype(int)

### 1. Construct cohort of patients.

In [None]:
print("# of ECGs in MUSE: ", len(muse_cache_df))
print("# of Patients in MUSE: ", muse_cache_df[unique_id_col].nunique())
print("AFib rate: ", muse_cache_df['afib'].mean())
regenerate = True
file_exists = os.path.exists('../processed_data/step1_afib_examples_va_' + map_params_to_filename(afib_preprocessing_config))
if not file_exists or regenerate:
    # Option to patients who only appear once; VA paper does not do this and includes patients with a single ECG.
    if afib_preprocessing_config['include_single_ecgs'] == False:
        patient_id_counts = muse_cache_df[unique_id_col].value_counts()
        muse_cache_df = muse_cache_df[muse_cache_df[unique_id_col].isin(patient_id_counts[patient_id_counts > 1].index)]
    
    def filter_rows(group):
        # Find the earliest date where afib = 1
        earliest_afib_date = group[group['afib'] == 1]['date'].min()
        # Select rows within a 30 day window before the earliest afib date
        group = group[(group['date'] >= earliest_afib_date - pd.Timedelta(days=afib_preprocessing_config['max_pred_gap'])) & (group['date'] < earliest_afib_date)]
        group['time_to_earliest_afib'] = (earliest_afib_date - group['date']).dt.days
        return group

    if afib_preprocessing_config['selection_criteria'] == 'first_two':
        afib_ecgs_df = muse_cache_df.sort_values('date').groupby(unique_id_col).head(2)
        
    elif afib_preprocessing_config['selection_criteria'] == 'va':
        # Select all single ECGs
        patient_id_counts = muse_cache_df[unique_id_col].value_counts()
        patient_ids_with_one_ecg = patient_id_counts[patient_id_counts == 1].index
        single_ecgs_df = muse_cache_df[muse_cache_df[unique_id_col].isin(patient_ids_with_one_ecg)]
        single_ecgs_df = single_ecgs_df[single_ecgs_df['afib'] == 0]
        single_ecgs_df = single_ecgs_df.sort_values('date').groupby(unique_id_col).head(1)
        single_ecgs_df['label'] = 0
        print("# of Single ECGs contributed: ", len(single_ecgs_df))
        print("# of patients in Single ECGs: ", single_ecgs_df[unique_id_col].nunique())
        # Of the patients with > 1 ECG
        print(single_ecgs_df['ecg_location'].value_counts() / len(single_ecgs_df))
        multiple_ecgs_df = muse_cache_df[~muse_cache_df[unique_id_col].isin(patient_ids_with_one_ecg)]

        print()
        # Separate into patients with any AFib ECGs, and patients with none
        patient_id_to_afib_presence = multiple_ecgs_df.groupby(unique_id_col)['afib'].sum().reset_index()
        patients_ids_with_no_afib = patient_id_to_afib_presence[patient_id_to_afib_presence['afib'] == 0][unique_id_col]
        patients_ids_with_afib = patient_id_to_afib_presence[patient_id_to_afib_presence['afib'] > 0][unique_id_col]

        multiple_ecgs_no_afib_df = multiple_ecgs_df[multiple_ecgs_df[unique_id_col].isin(patients_ids_with_no_afib)]
        multiple_ecgs_no_afib_df = multiple_ecgs_no_afib_df.sort_values('date').groupby(unique_id_col).head(1)
        multiple_ecgs_no_afib_df['label'] = 0
        print("# of Multiple ECGs no AFib contributed: ", len(multiple_ecgs_no_afib_df))
        print(multiple_ecgs_df['ecg_location'].value_counts() / len(multiple_ecgs_df))
        print("# of Multiple ECGs (-)contributed: ", len(multiple_ecgs_no_afib_df))
        print("# of patients in Multiple ECGs (-): ", multiple_ecgs_no_afib_df['UniqueID'].nunique())
        print()

        multiple_ecgs_afib_df = multiple_ecgs_df[multiple_ecgs_df[unique_id_col].isin(patients_ids_with_afib)]
        multiple_ecgs_afib_df = multiple_ecgs_afib_df.groupby(unique_id_col).apply(filter_rows).reset_index(drop=True)
        print("# of ")
        multiple_ecgs_afib_df['label'] = 1
        print("# of AFib contributed: ", len(multiple_ecgs_afib_df))
        print("# of Multiple ECGs (+) contributed: ", len(multiple_ecgs_afib_df))
        print("# of patients in Multiple ECGs (+): ", multiple_ecgs_afib_df['UniqueID'].nunique())

        print()

        afib_ecgs_df = pd.concat([single_ecgs_df, multiple_ecgs_no_afib_df, multiple_ecgs_afib_df])

    else:
        afib_ecgs_df = muse_cache_df.sort_values('date').groupby(unique_id_col).tail(2)
    afib_ecgs_df.to_csv('../processed_data/step1_afib_examples_va_' + map_params_to_filename(afib_preprocessing_config))

else:
    afib_ecgs_df = pd.read_csv('../processed_data/step1_afib_examples_va_' + map_params_to_filename(afib_preprocessing_config))

print("# of ECGs in MUSE: ", len(afib_ecgs_df))
print("# of Patients in MUSE: ", len(set(afib_ecgs_df['PatientID'])))
print("AFib rate: ", afib_ecgs_df['afib'].mean()) # This should be 0
print("Label rate: ", afib_ecgs_df['label'].mean())
ecg_path_to_afib = dict(zip(muse_cache_df.path, muse_cache_df.afib))
print()
print(afib_ecgs_df['ecg_location'].value_counts() / len(afib_ecgs_df))
len(afib_ecgs_df)
222120 + 217877

### 2. Filter for sinus rhythm ECGs.

In [None]:
from ecg_feature_names import xml_file_to_features, add_binary_demographic_features
from paths import map_params_to_filename

regenerate = False
if os.path.exists('../processed_data/afib_' + map_params_to_filename(afib_preprocessing_config) + '.csv') and regenerate==False:
    sr_afib_ecgs_df = pd.read_csv('../processed_data/step2_afib_' + map_params_to_filename(afib_preprocessing_config) + '.csv')
    # print("# of ECGs: ", len(sr_afib_ecgs_df))
    # print("# of Patients: ", len(set(sr_afib_ecgs_df[unique_id_col])))
    # print("Eventual AFib rate: ", sr_afib_ecgs_df['label'].mean())

    print(sr_afib_ecgs_df['ecg_location'].value_counts() / len(sr_afib_ecgs_df))
    print()
else:
    # Write out ECG file
    all_paths = afib_ecgs_df['path']
    print("# of Paths: ", len(all_paths))

    # Map to array of ECGs 
    # preprocess 
    xml_outputs = dview.map_sync(xml_file_to_features, all_paths)

    print("# of xml_outputs: ", len(xml_outputs))
    
    feature_dicts = [o[0] for o in xml_outputs]
    demographic_feature_df = pd.DataFrame(feature_dicts)
    lead_data_paths = [o[1] for o in xml_outputs]

    for key in demographic_feature_df.columns:
        afib_ecgs_df[key] = demographic_feature_df[key]
    afib_ecgs_df['path_to_lead_data'] = lead_data_paths
    afib_ecgs_df = add_binary_demographic_features(afib_ecgs_df)
    print("# of ECGs: ", len(afib_ecgs_df))
    print("# of Patients: ", len(set(afib_ecgs_df[unique_id_col])))
    print("Eventual AFib rate: ", afib_ecgs_df['label'].mean())

    print(afib_ecgs_df['ecg_location'].value_counts() / len(afib_ecgs_df))
    print()

    sr_afib_ecgs_df = afib_ecgs_df[afib_ecgs_df['sinus_rhythm_flag'] == True]
    print("After dropping non-sinus rhythms")
    print("# of ECGs: ", len(sr_afib_ecgs_df))
    print("# of Patients: ", len(set(sr_afib_ecgs_df[unique_id_col])))
    print("Eventual AFib rate: ", sr_afib_ecgs_df['label'].mean())

    print(sr_afib_ecgs_df['ecg_location'].value_counts() / len(sr_afib_ecgs_df))
    print()


    sr_afib_ecgs_df.to_csv('../processed_data/afib_' + map_params_to_filename(afib_preprocessing_config) + '.csv')
len(sr_afib_ecgs_df)


### 3. For each sinus rhythm ECG, remove baseline wander, truncate, and normalize.

In [None]:
# Remove baseline wander
from ecg_preprocessing_fs import ecg_remove_baseline_wander_path, ecg_truncate_mean_normalize_path

pr_sr_afib_ecgs_df = sr_afib_ecgs_df[~sr_afib_ecgs_df['path_to_lead_data'].isna()]
raw_ecg_paths = list(pr_sr_afib_ecgs_df['path_to_lead_data'])
bwr_ecg_paths = dview.map_sync(ecg_remove_baseline_wander_path, raw_ecg_paths)
pr_sr_afib_ecgs_df['path_to_bwr_data'] = bwr_ecg_paths

# Truncate and normalize
pr_sr_afib_ecgs_df = pr_sr_afib_ecgs_df[~pr_sr_afib_ecgs_df['path_to_bwr_data'].str.contains('nan')]
bwr_ecg_paths = list(pr_sr_afib_ecgs_df['path_to_bwr_data'])
bwr_trunc_norm_paths = dview.map_sync(ecg_truncate_mean_normalize_path, bwr_ecg_paths)
pr_sr_afib_ecgs_df['path_to_bwr_trunc_norm_data'] = bwr_trunc_norm_paths
pr_sr_afib_ecgs_df.to_csv('../processed_data/processed_afib_' + map_params_to_filename(afib_preprocessing_config) + '.csv')

In [None]:
# 487381 patients total
# 454367 patients with SR ECGs 
# 453030 patients with valid ECG files
# 427482 patients mappable to records in the EDW using patient first name, last name, and date of birth
# 423026 patients with demographics in the EDW
# 406337 patients with no established AF diagnosis
# 406274 patients after filtering out UniqueIDs associated with positive and negative label