In [1]:
import pandas as pd
import numpy as np
import wfdb
import ast

def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data

path = '/Users/rohanmotanavar/datasets/PTB_XL/'
sampling_rate = 100

# Load and convert annotation data
Y = pd.read_csv(path + 'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, path)

# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path + 'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

def aggregate_diagnostic_single(y_dic):
    # Filter diagnostic SCP codes and their confidence scores
    valid_codes = [(key, y_dic[key]) for key in y_dic.keys() if key in agg_df.index]
    if not valid_codes:  # If no diagnostic codes, return empty
        return None
    # Select the SCP code with the highest confidence
    max_code = max(valid_codes, key=lambda x: x[1])[0]
    # Map to its superclass
    return agg_df.loc[max_code].diagnostic_class

# Apply diagnostic superclass (single superclass per patient)
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic_single)

# Save the updated DataFrame to a CSV file
Y.to_csv('ptbxl_database_with_single_superclass.csv')

# Split data into train and test
test_fold = 10
# Train
X_train = X[np.where(Y.strat_fold != test_fold)]
y_train = Y[(Y.strat_fold != test_fold)].diagnostic_superclass
# Test
X_test = X[np.where(Y.strat_fold == test_fold)]
y_test = Y[Y.strat_fold == test_fold].diagnostic_superclass