In [1]:
import pandas as pd
import numpy as np
import wfdb
import ast
from tqdm import tqdm
import warnings; warnings.filterwarnings('ignore')
from IPython.display import display

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

In [2]:
def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in tqdm(df.filename_lr)]
    else:
        data = [wfdb.rdsamp(path+f) for f in tqdm(df.filename_hr)]
    data = np.array([signal for signal, meta in data])
    return data

In [3]:
path = '../ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/'
sampling_rate=100

# load and convert annotation data
Y = pd.read_csv(path+'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, path)

print('data shpae:', X.shape)
Y[['scp_codes']]
Y

 27%|█████████████████████▏                                                       | 5998/21837 [02:44<06:18, 41.87it/s]

KeyboardInterrupt: 

In [None]:
#Load Data for Heart Beat Diagnostic

In [None]:
# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path+'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]
print(agg_df.shape)
agg_df.head()

In [None]:
def aggregate_supclass_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))
    
# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_supclass_diagnostic)
Y['diagnostic_superclass_len'] = Y['diagnostic_superclass'].apply(len)
Y.loc[Y.diagnostic_superclass_len > 1, 'diagnostic_superclass']

In [None]:
vc = Y['diagnostic_superclass_len'].value_counts()

sns.set_style("whitegrid")
bar,ax = plt.subplots(figsize=(10,6))
ax = sns.barplot(x=vc.values/vc.values.sum()*100., y=vc.index, ci=None, palette="muted",orient='h' )
ax.set_title("Diagnostic Superclass Len Distribution", fontsize=20)
ax.set_xlabel ("percentage over all samples")
ax.set_ylabel ("diagnostic_superclass_len")
for rect in ax.patches:
    ax.text (rect.get_width(), rect.get_y() + rect.get_height() / 2,"%.1f%%"% rect.get_width(), weight='bold' )

In [None]:
#Number of Subclass per ECG record

In [None]:
def aggregate_subclass_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_subclass)
    ret = list(set(tmp))
    ret = ['sub_'+r for r in ret] # to distinguish between subclass and superclass columns
    return ret

# Apply diagnostic subclass
Y['diagnostic_subclass'] = Y.scp_codes.apply(aggregate_subclass_diagnostic)
Y['diagnostic_subclass_len'] = Y['diagnostic_subclass'].apply(len)
Y.loc[Y.diagnostic_subclass_len > 1, 'diagnostic_subclass']

In [None]:
vc = Y['diagnostic_subclass_len'].value_counts()

sns.set_style("whitegrid")
bar,ax = plt.subplots(figsize=(10,6))
ax = sns.barplot(x=vc.values/vc.values.sum()*100., y=vc.index, ci=None, palette="muted",orient='h' )
ax.set_title("Diagnostic Subclass Length Distribution", fontsize=20)
ax.set_xlabel ("percentage over all samples")
ax.set_ylabel ("diagnostic_subclass_len")
for rect in ax.patches:
    ax.text (rect.get_width(), rect.get_y() + rect.get_height() / 2,"%.1f%%"% rect.get_width(), weight='bold' )


In [None]:
all_superclass = pd.Series(np.concatenate(Y['diagnostic_superclass'].values))
all_subclass = pd.Series(np.concatenate(Y['diagnostic_subclass'].values))
superclass_cols = all_superclass.unique()
subclass_cols = all_subclass.unique()
update_cols = np.concatenate([superclass_cols, subclass_cols]) # add meta data columns
meta_cols = ['age', 'sex', 'height', 'weight', 'nurse', 'site', 'device',] # could add more columns as features

In [None]:
#Reformat Data

In [None]:
class ClassUpdate():
    def __init__(self, cols):
        self.cols = cols

    def __call__(self, row):
        for sc in row['diagnostic_superclass']:
            row[sc] = 1
        for sc in row['diagnostic_subclass']:
            row[sc] = 1
            
        return row

def get_data_by_folds(folds, x, y, update_cols, feature_cols):
    assert len(folds)  > 0, '# of provided folds should longer than 1'
    #print(y.strat_fold)
    filt = np.isin(y.strat_fold.values, folds)
    x_selected = x[filt]
    y_selected = y[filt]
    
    for sc in update_cols:
        y_selected[sc] = 0
        
    cls_updt = ClassUpdate(update_cols)
    
    y_selected = y_selected.apply(cls_updt, axis=1)
    
    return x_selected, y_selected[list(feature_cols)+list(update_cols)+['strat_fold']]

In [None]:
x_all, y_all = get_data_by_folds(np.arange(1, 11), X, Y, update_cols, meta_cols)

In [None]:
y_all

In [None]:
vc = y_all[superclass_cols].sum(axis=0)

sns.set_style("whitegrid")
bar,ax = plt.subplots(figsize=(10,6))
ax = sns.barplot(x=vc.values/y_all.shape[0]*100., y=vc.index, ci=None, palette="muted",orient='h' )
ax.set_title("Diagnositic Superclass Distribution", fontsize=20)
ax.set_xlabel ("percentage over all samples")
ax.set_ylabel ("diagnositic superclass")
for rect in ax.patches:
    ax.text (rect.get_width(), rect.get_y() + rect.get_height() / 2,"%.1f%%"% rect.get_width(), weight='bold' )

In [None]:
#ECG Example

In [None]:
y_all['NORM']

In [None]:
sample_num = 1

for superclass in superclass_cols:
    filt = y_all[superclass] == 1
    y_selected = y_all.loc[filt]
    x_selected = x_all[filt]
    
    for i in range(sample_num):
        y_ = y_selected.iloc[i]
        x_ = x_selected[i]
        
        #display(y_)
        #print(y_.shape, x_.shape); assert False
        bar, axes = plt.subplots(x_.shape[1], 1, figsize=(10,10))
        title = "Superclass = {}, Age = {}, Height = {}, Weight = {}, Sex = {}, Nurse = {}, Site = {}, Device = {}".format(superclass, 
                                                                                                                           y_['age'], y_['height'], y_['weight'],
                                                                                                                           y_['sex'], y_['nurse'], y_['site'], y_['device'],)
        axes[0].set_title(title, fontsize=15)
        
        for c in np.arange(x_.shape[1]):
            sns.lineplot(x=np.arange(x_.shape[0]), y=x_[:, c], ax=axes[c])
        
        plt.tight_layout()
        plt.show()
        #assert False

In [None]:
#Extracted Features
extfea_path = '../ptb-xl-a-comprehensive-electrocardiographic-feature-dataset-1.0.1/features/'
extfea_data = pd.read_csv(extfea_path+'ecgdeli_features.csv', index_col='ecg_id')


In [None]:
extfea_data.shape

In [None]:
Y_new = Y[:extfea_data.shape[0]]

In [None]:
Y_new.shape

In [None]:
X_new = X[:extfea_data.shape[0]]

In [None]:
X_new.shape

In [None]:
# raw_signal = X[:extfea_data.shape[0]]
# raw_signal = raw_signal[Y_new.diagnostic_superclass_len == 1]
# raw_signal.shape

In [None]:
# leads_I = []
# leads_II = []
# leads_III = []
# leads_aVR = []
# leads_aVL = []
# leads_aVF = []
# leads_V1 = []
# leads_V2 = []
# leads_V3 = []
# leads_V4 = []
# leads_V5 = []
# leads_V6 = []

# for leads in raw_signal :
#     temp = []
#     lead_I = leads[:, 0]
#     lead_II = leads[:, 1]
#     lead_III = leads[:, 2]
#     lead_aVR = leads[:, 3]
#     lead_aVL = leads[:, 4]
#     lead_aVF = leads[:, 5]
#     lead_V1 = leads[:, 6]
#     lead_V2 = leads[:, 7]
#     lead_V3 = leads[:, 8]
#     lead_V4 = leads[:, 9]
#     lead_V5 = leads[:, 10]
#     lead_V6 = leads[:, 11]
    
#     leads_I.append(lead_I)
#     leads_II.append(lead_II)
#     leads_III.append(lead_III)
#     leads_aVR.append(lead_aVR)
#     leads_aVL.append(lead_aVL)
#     leads_aVF.append(lead_aVF)
#     leads_V1.append(lead_V1)
#     leads_V2.append(lead_V2)
#     leads_V3.append(lead_V3)
#     leads_V4.append(lead_V4)
#     leads_V5.append(lead_V5)
#     leads_V6.append(lead_V6)

In [None]:
X_new = Y_new.assign(
        #RR_Mean
        RR_Mean_Global = extfea_data['RR_Mean_Global'],
    
        #ST_Elevation
        ST_Elev_I = extfea_data['ST_Elev_I'],
        ST_Elev_II = extfea_data['ST_Elev_II'],
        ST_Elev_III = extfea_data['ST_Elev_III'],
        ST_Elev_V1 = extfea_data['ST_Elev_V1'],
        ST_Elev_V2 = extfea_data['ST_Elev_V2'],
        ST_Elev_V3 = extfea_data['ST_Elev_V3'],
        ST_Elev_V4 = extfea_data['ST_Elev_V4'],
        ST_Elev_V5 = extfea_data['ST_Elev_V5'],
        ST_Elev_V6 = extfea_data['ST_Elev_V6'],
        ST_Elev_aVF = extfea_data['ST_Elev_aVF'],
        ST_Elev_aVR = extfea_data['ST_Elev_aVR'],
        ST_Elev_aVL = extfea_data['ST_Elev_aVL'],
    
        #PR_Interval
        PR_Int_Global = extfea_data['PR_Int_Global'],
        PR_Int_I = extfea_data['PR_Int_I'],
        PR_Int_II = extfea_data['PR_Int_II'],
        PR_Int_III = extfea_data['PR_Int_III'],
        PR_Int_V1 = extfea_data['PR_Int_V1'],
        PR_Int_V2 = extfea_data['PR_Int_V2'],
        PR_Int_V3 = extfea_data['PR_Int_V3'],
        PR_Int_V4 = extfea_data['PR_Int_V4'],
        PR_Int_V5 = extfea_data['PR_Int_V5'],
        PR_Int_V6 = extfea_data['PR_Int_V6'],
        PR_Int_aVF = extfea_data['PR_Int_aVF'],
        PR_Int_aVL = extfea_data['PR_Int_aVL'],
        PR_Int_aVR = extfea_data['PR_Int_aVR'],
    
        #PQ_Interval
        PQ_Int_Global = extfea_data['PQ_Int_Global'],
        PQ_Int_I = extfea_data['PQ_Int_I'],
        PQ_Int_II = extfea_data['PQ_Int_II'],
        PQ_Int_III = extfea_data['PQ_Int_III'],
        PQ_Int_V1 = extfea_data['PQ_Int_V1'],
        PQ_Int_V2 = extfea_data['PQ_Int_V2'],
        PQ_Int_V3 = extfea_data['PQ_Int_V3'],
        PQ_Int_V4 = extfea_data['PQ_Int_V4'],
        PQ_Int_V5 = extfea_data['PQ_Int_V5'],
        PQ_Int_V6 = extfea_data['PQ_Int_V6'],
        PQ_Int_aVF = extfea_data['PQ_Int_aVF'],
        PQ_Int_aVL = extfea_data['PQ_Int_aVL'],
        PQ_Int_aVR = extfea_data['PQ_Int_aVR'],
    
        #QRS_Duration
        QRS_Dur_I = extfea_data['QRS_Dur_I'],
        QRS_Dur_II = extfea_data['QRS_Dur_II'],
        QRS_Dur_III = extfea_data['QRS_Dur_III'],
        QRS_Dur_V1 = extfea_data['QRS_Dur_V1'],
        QRS_Dur_V2 = extfea_data['QRS_Dur_V2'],
        QRS_Dur_V3 = extfea_data['QRS_Dur_V3'],
        QRS_Dur_V4 = extfea_data['QRS_Dur_V4'],
        QRS_Dur_V5 = extfea_data['QRS_Dur_V5'],
        QRS_Dur_V6 = extfea_data['QRS_Dur_V6'],
        QRS_Dur_aVF = extfea_data['QRS_Dur_aVF'],
        QRS_Dur_aVL = extfea_data['QRS_Dur_aVL'],
        QRS_Dur_aVR = extfea_data['QRS_Dur_aVR'],
    
        #QT_Interval
        QT_Int_I = extfea_data['QT_Int_I'],
        QT_Int_II = extfea_data['QT_Int_II'],
        QT_Int_III = extfea_data['QT_Int_III'],
        QT_Int_V1 = extfea_data['QT_Int_V1'],
        QT_Int_V2 = extfea_data['QT_Int_V2'],
        QT_Int_V3 = extfea_data['QT_Int_V3'],
        QT_Int_V4 = extfea_data['QT_Int_V4'],
        QT_Int_V5 = extfea_data['QT_Int_V5'],
        QT_Int_V6 = extfea_data['QT_Int_V6'],
        QT_Int_aVF = extfea_data['QT_Int_aVF'],
        QT_Int_aVL = extfea_data['QT_Int_aVL'],
        QT_Int_aVR = extfea_data['QT_Int_aVR'],
    
        #R_Amplitudo
        R_Amp_I = extfea_data['R_Amp_I'],
        R_Amp_II = extfea_data['R_Amp_II'],
        R_Amp_III = extfea_data['R_Amp_III'],
        R_Amp_V1 = extfea_data['R_Amp_V1'],
        R_Amp_V2 = extfea_data['R_Amp_V2'],
        R_Amp_V3 = extfea_data['R_Amp_V3'],
        R_Amp_V4 = extfea_data['R_Amp_V4'],
        R_Amp_V5 = extfea_data['R_Amp_V5'],
        R_Amp_V6 = extfea_data['R_Amp_V6'],
        R_Amp_aVF = extfea_data['R_Amp_aVF'],
        R_Amp_aVL = extfea_data['R_Amp_aVL'],
        R_Amp_aVR = extfea_data['R_Amp_aVR'],
    
        #Q_Amplitudo
        Q_Amp_I = extfea_data['Q_Amp_I'],
        Q_Amp_II = extfea_data['Q_Amp_II'],
        Q_Amp_III = extfea_data['Q_Amp_III'],
        Q_Amp_V1 = extfea_data['Q_Amp_V1'],
        Q_Amp_V2 = extfea_data['Q_Amp_V2'],
        Q_Amp_V3 = extfea_data['Q_Amp_V3'],
        Q_Amp_V4 = extfea_data['Q_Amp_V4'],
        Q_Amp_V5 = extfea_data['Q_Amp_V5'],
        Q_Amp_V6 = extfea_data['Q_Amp_V6'],
        Q_Amp_aVF = extfea_data['Q_Amp_aVF'],
        Q_Amp_aVL = extfea_data['Q_Amp_aVL'],
        Q_Amp_aVR = extfea_data['Q_Amp_aVR'],
        
        #P_Amplitudo
        P_Amp_I = extfea_data['P_Amp_I'],
        P_Amp_II = extfea_data['P_Amp_II'],
        P_Amp_III = extfea_data['P_Amp_III'],
        P_Amp_V1 = extfea_data['P_Amp_V1'],
        P_Amp_V2 = extfea_data['P_Amp_V2'],
        P_Amp_V3 = extfea_data['P_Amp_V3'],
        P_Amp_V4 = extfea_data['P_Amp_V4'],
        P_Amp_V5 = extfea_data['P_Amp_V5'],
        P_Amp_V6 = extfea_data['P_Amp_V6'],
        P_Amp_aVF = extfea_data['P_Amp_aVF'],
        P_Amp_aVL = extfea_data['P_Amp_aVL'],
        P_Amp_aVR = extfea_data['P_Amp_aVR']

)

        

In [None]:
#Select Record with single superclass label
X_selected = X_new[Y_new.diagnostic_superclass_len == 1]
Y_selected = Y_new.loc[Y_new.diagnostic_superclass_len == 1]

In [None]:
selected_feature = ['RR_Mean_Global',
                    'ST_Elev_I','ST_Elev_II','ST_Elev_III',
                    'ST_Elev_V1','ST_Elev_V2', 'ST_Elev_V3', 'ST_Elev_V4', 'ST_Elev_V5', 'ST_Elev_V6',
                    'ST_Elev_aVF','ST_Elev_aVL', 'ST_Elev_aVR',
                    'PR_Int_I','PR_Int_II','PR_Int_III',
                    'PR_Int_V1','PR_Int_V2','PR_Int_V3', 'PR_Int_V4', 'PR_Int_V5', 'PR_Int_V6',
                    'PR_Int_aVF','PR_Int_aVL', 'PR_Int_aVR',
                    'PQ_Int_Global','PQ_Int_I','PQ_Int_II','PQ_Int_III',
                    'PQ_Int_V1','PQ_Int_V2','PQ_Int_V3', 'PQ_Int_V4', 'PQ_Int_V5', 'PQ_Int_V6',
                    'PQ_Int_aVF','PQ_Int_aVL', 'PQ_Int_aVR',
                    'QRS_Dur_I','QRS_Dur_II','QRS_Dur_III',
                    'QRS_Dur_V1','QRS_Dur_V2','QRS_Dur_V3','QRS_Dur_V4','QRS_Dur_V5','QRS_Dur_V6',
                    'QRS_Dur_aVF','QRS_Dur_aVL','QRS_Dur_aVR',
                    'QT_Int_I','QT_Int_II','QT_Int_III',
                    'QT_Int_V1','QT_Int_V2','QT_Int_V3', 'QT_Int_V4', 'QT_Int_V5', 'QT_Int_V6',
                    'QT_Int_aVF','QT_Int_aVL', 'QT_Int_aVR',
                    'R_Amp_I','R_Amp_II','R_Amp_III',
                    'R_Amp_V1','R_Amp_V2','R_Amp_V3', 'R_Amp_V4', 'R_Amp_V5', 'R_Amp_V6',
                    'R_Amp_aVF','R_Amp_aVL', 'R_Amp_aVR',
                    'Q_Amp_I','Q_Amp_II','Q_Amp_III',
                    'Q_Amp_V1','Q_Amp_V2','Q_Amp_V3', 'Q_Amp_V4', 'Q_Amp_V5', 'Q_Amp_V6',
                    'Q_Amp_aVF','Q_Amp_aVL', 'Q_Amp_aVR',
                    'P_Amp_I','P_Amp_II','P_Amp_III',
                    'P_Amp_V1','P_Amp_V2','P_Amp_V3', 'P_Amp_V4', 'P_Amp_V5', 'P_Amp_V6',
                    'P_Amp_aVF','P_Amp_aVL', 'P_Amp_aVR'
                                
                    ]
X_selected_feature = X_selected[selected_feature]

In [None]:
# X_selected_feature['lead_I'] = leads_I
# X_selected_feature['lead_II'] = leads_II
# X_selected_feature['lead_III'] = leads_III
# X_selected_feature['lead_aVR'] = leads_aVR
# X_selected_feature['lead_aVL'] = leads_aVL
# X_selected_feature['lead_aVF'] = leads_aVF
# X_selected_feature['lead_V1'] = leads_V1
# X_selected_feature['lead_V2'] = leads_V2
# X_selected_feature['lead_V3'] = leads_V3
# X_selected_feature['lead_V4'] = leads_V4
# X_selected_feature['lead_V5'] = leads_V5
# X_selected_feature['lead_V6'] = leads_V6

In [None]:
labels = []
for i in range(len(Y_selected)) :
    if Y_selected['diagnostic_superclass'].iloc[i] == ['NORM']:
        labels.append('NORM')
    elif Y_selected['diagnostic_superclass'].iloc[i] == ['MI'] :
        labels.append('MI')
    elif Y_selected['diagnostic_superclass'].iloc[i] == ['STTC'] :
        labels.append('STTC')
    elif Y_selected['diagnostic_superclass'].iloc[i] == ['HYP'] :
        labels.append('HYP')
    elif Y_selected['diagnostic_superclass'].iloc[i] == ['CD'] :
        labels.append('CD')

In [None]:
df_labels = pd.DataFrame(labels, columns = ['labels'])

In [None]:
class_distribution_before = df_labels['labels'].value_counts()
# Generate a pie chart for class distribution before oversampling
plt.figure(figsize=(8, 6))
plt.pie(class_distribution_before, labels=class_distribution_before.index, autopct='%1.1f%%', shadow=True)
plt.title('Class Distribution')
plt.axis('equal')  # Equal aspect ratio ensures the pie chart is circular.
plt.show()

In [None]:
X_selected.shape

In [None]:
df_labels['strat_fold'] = Y_selected['strat_fold'].tolist()

In [None]:
df_labels_filter = df_labels
df_labels_filter.loc[df_labels_filter["labels"] == "STTC", "labels"] = "OTHER"
df_labels_filter.loc[df_labels_filter["labels"] == "HYP", "labels"] = "OTHER"
df_labels_filter.loc[df_labels_filter["labels"] == "CD", "labels"] = "OTHER"
print(df_labels_filter['labels'].value_counts())

In [None]:
class_distribution_after= df_labels_filter['labels'].value_counts()
# Generate a pie chart for class distribution before oversampling
plt.figure(figsize=(8, 6))
plt.pie(class_distribution_after, labels=class_distribution_after.index, autopct='%1.1f%%', shadow=True)
plt.title('Class Distribution')
plt.axis('equal')  # Equal aspect ratio ensures the pie chart is circular.
plt.show()

In [None]:
X_selected_feature = X_selected_feature.fillna(0)
X_selected_feature 

In [None]:
Y_selected.shape

In [None]:
df_features_label = X_selected_feature
df_features_label['labels'] = df_labels_filter['labels'].tolist()
df_features_label['strat_fold'] = df_labels_filter['strat_fold'].tolist()
df_features_label.iloc[:,60:76]

In [None]:
# Separate minority and majority class samples for SMOTE-NC
class_0 = df_features_label[df_features_label['labels'] == 'MI']  
class_1 = df_features_label[df_features_label['labels'] == 'NORM']
class_2 = df_features_label[df_features_label['labels'] == 'OTHER'] 

In [None]:
class_0.shape

In [None]:
class_0['labels_int'] = 0
class_1['labels_int'] = 1
class_2['labels_int'] = 2

In [None]:
feature_end = class_0.shape[1]-3

In [None]:
df_features_label.iloc[:, 0:feature_end]

In [None]:
# Convert the features to a numpy array (SMOTENC requires numpy arrays)
class_0_features = class_0.iloc[:, 0:feature_end].to_numpy()
class_1_features = class_1.iloc[:, 0:feature_end].to_numpy()
class_2_features = class_2.iloc[:, 0:feature_end].to_numpy()

In [None]:
print("class_0 shape ", class_0_features.shape)
print("class_1 shape ", class_1_features.shape)
print("class_2 shape ", class_2_features.shape)
print(class_0_features.shape[0]+class_1_features.shape[0]+class_2_features.shape[0])

In [None]:
# Combine the features and labels for SMOTE-NC
X_for_smt = np.concatenate((class_0_features, class_1_features, class_2_features), axis=0)
y_for_smt = np.concatenate((class_0['labels_int'].values, class_1['labels_int'].values, class_2['labels_int'].values), axis=0)

In [None]:
# Create a synthetic categorical feature based on class labels (0, 1, 2)
num_samples = len(y_for_smt)
categorical_feature = np.zeros(num_samples, dtype=int)
categorical_feature[y_for_smt == 0] = 0
categorical_feature[y_for_smt == 1] = 1
categorical_feature[y_for_smt == 2] = 2

In [None]:
# Add the synthetic categorical feature to the feature matrix
X_with_categorical = np.column_stack((X_for_smt, categorical_feature))

In [None]:
# Apply SMOTE-NC to oversample the minority class (class 1)
from imblearn.combine import SMOTETomek
smote_tomek = SMOTETomek(random_state=42)
X_oversampled, y_oversampled = smote_tomek.fit_resample(X_with_categorical, y_for_smt)

In [None]:
balanced_features_df = pd.DataFrame(columns = [selected_feature])

In [None]:
balanced_features_df

In [None]:
for record in X_oversampled :
    balanced_features_df.loc[len(balanced_features_df.index)-1] = record[:feature_end].tolist() 

In [None]:
balanced_features_df['labels'] = y_oversampled.tolist()

In [None]:
# Check the class distribution after oversampling
print("Class distribution after oversampling:")
class_distribution_after = balanced_features_df['labels'].value_counts()
print(class_distribution_after)

In [None]:
# Generate a pie chart for class distribution after oversampling
plt.figure(figsize=(8, 6))
plt.pie(class_distribution_after, labels=class_distribution_after.index, autopct='%1.1f%%', shadow=True)
plt.title('Class Distribution After Oversampling')
plt.axis('equal')  # Equal aspect ratio ensures the pie chart is circular.
plt.show()

In [None]:
X_balanced_features = balanced_features_df.iloc[:, 0:feature_end].values
y_balanced_features = balanced_features_df['labels'].values

In [None]:
X_balanced_features

In [None]:
X_balanced_features.shape

In [None]:
from sklearn.model_selection import train_test_split
# First, split the data into train (80%) and test (20%)
X_train_features, X_test_features, y_train_features, y_test_features = train_test_split(X_balanced_features, y_balanced_features, test_size=0.2, random_state=42)

In [None]:
import numpy as np
from keras.utils import to_categorical
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, BatchNormalization, Dropout
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.callbacks import EarlyStopping
import keras

In [None]:
# Standardize the input features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_features)
X_test_scaled = scaler.transform(X_test_features)

In [None]:
# Reshape the input features for CNN
X_train_reshaped = X_train_scaled.reshape(X_train_scaled.shape[0], X_train_scaled.shape[1], 1)
X_test_reshaped = X_test_scaled.reshape(X_test_scaled.shape[0], X_test_scaled.shape[1], 1)

In [None]:
X_train_reshaped.shape

In [None]:
X_test_reshaped.shape

In [None]:
named_labels = ['MI', 'NORM', 'OTHER']
named_labels = np.array(named_labels)
named_labels

In [None]:
# Define the CNN model (First)
def create_model():
    model = Sequential()
    model.add(Conv1D(64, kernel_size=3, activation='relu', input_shape=(X_train_reshaped.shape[1], 1)))
    model.add(Dropout(0.5))
    model.add(BatchNormalization())
    model.add(Conv1D(64, kernel_size=3, activation='relu'))
    model.add(Dropout(0.5))
    model.add(BatchNormalization())
    model.add(MaxPooling1D(pool_size=2))
    model.add(Flatten())
    model.add(Dense(X_train_reshaped.shape[1], activation='relu'))
    model.add(Dense(len(named_labels), activation='softmax'))


    # # EarlyStopping
    # early_stopping = EarlyStopping(monitor='val_loss', patience=30)

    # Compile the model
    model.compile(loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
# Convert multiclass labels to one-hot encoding
y_train_onehot = to_categorical(y_train_features, num_classes=len(named_labels))
y_train_onehot

In [None]:
y_train_onehot.shape

In [None]:
# Convert multiclass labels to one-hot encoding
y_test_onehot = to_categorical(y_test_features, num_classes=len(named_labels))
y_test_onehot

In [None]:
# Perform k-fold cross-validation
kfold = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
accuracy_scores = []
confusion_matrices = []
train_losses = []
val_losses = []
fold_models = []
train_accuracies = []
val_accuracies = []

for train_index, val_index in kfold.split(X_train_reshaped, y_train_features):
    train_X, val_X = X_train_reshaped[train_index], X_train_reshaped[val_index]
    train_y, val_y = y_train_onehot[train_index], y_train_onehot[val_index]
    
    #Create Model
    model = create_model()
    
    # Train the model
    history = model.fit(train_X, train_y, validation_data=(val_X, val_y), epochs=100, batch_size=64)
    
    # Calculate training and validation losses
    training_loss = history.history['loss'] 
    validation_loss = history.history['val_loss']
    
    train_losses.append(training_loss)
    val_losses.append(validation_loss)
    
    # Extract train and validation accuracies from the training history
    train_accuracy = history.history['accuracy']
    val_accuracy = history.history['val_accuracy']
    
    # Store the accuracies for each fold
    train_accuracies.append(train_accuracy)
    val_accuracies.append(val_accuracy)
    
    # Evaluate the model on validation set
    _, accuracy = model.evaluate(val_X, val_y)
    accuracy_scores.append(accuracy)
    
    # Save the model
    fold_models.append(model)
    
    # Make predictions on validation set
    Y_pred = model.predict(val_X)
    Y_pred_labels = np.argmax(Y_pred, axis=1)
    Y_true_labels = np.argmax(val_y, axis=1)

    # Matriks konfusi
    conf_matrix = confusion_matrix(Y_true_labels, Y_pred_labels, labels=np.arange(len(named_labels)))
    confusion_matrices.append(conf_matrix)

# Rata-rata akurasi dan matriks konfusi pada semua lipatan
mean_accuracy = np.mean(accuracy_scores)
mean_conf_matrix = np.sum(confusion_matrices, axis=0)

print("Mean Accuracy:", mean_accuracy)
print("Mean Confusion Matrix:")
print(mean_conf_matrix)

# Classification report
print("Classification Report:")
print(classification_report(Y_true_labels, Y_pred_labels, labels=np.arange(len(named_labels)), target_names=named_labels))

In [None]:
# Plot the training and validation losses for each fold
k = 10
fold_numbers = range(1, k+1)
for fold in fold_numbers:
    plt.plot(range(1, len(train_losses[fold-1])+1), train_losses[fold-1], marker='o', label='Training Loss')
    plt.plot(range(1, len(val_losses[fold-1])+1), val_losses[fold-1], marker='o', label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Training and Validation Loss - Fold {fold}')
    plt.legend()
    plt.show()

In [None]:
# Plot the training and validation accuracies for each fold
k = 10
fold_numbers = range(1, k+1)
for fold in fold_numbers:
    plt.plot(range(1, len(train_accuracies[fold-1])+1), train_accuracies[fold-1], marker='o', label='Training Accuracy')
    plt.plot(range(1, len(val_accuracies[fold-1])+1), val_accuracies[fold-1], marker='o', label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title(f'Training and Validation Accuracy - Fold {fold}')
    plt.legend()
    plt.show()

In [None]:
# Sensitivitas (Sensitivity) dan Spesifisitas (Specificity)
sensitivities = []
specificities = []

for conf_matrix in confusion_matrices:
    true_positive = np.diag(conf_matrix)
    false_negative = np.sum(conf_matrix, axis=1) - true_positive
    false_positive = np.sum(conf_matrix, axis=0) - true_positive
    true_negative = np.sum(conf_matrix) - (true_positive + false_negative + false_positive)

    sensitivity = true_positive / (true_positive + false_negative)
    specificity = true_negative / (true_negative + false_positive)

    sensitivities.append(sensitivity)
    specificities.append(specificity)

mean_sensitivity = np.mean(sensitivities)
mean_specificity = np.mean(specificities)

print("Mean Sensitivity:", mean_sensitivity)
print("Mean Specificity:", mean_specificity)

In [None]:
# Plot the training and validation losses for each fold
k = 10
fold_numbers = range(1, k+1)
for fold in fold_numbers:
    # Make predictions on test set
    best_model = fold_models[fold-1]
    Y_predict = best_model.predict(X_test_reshaped)
    Y_predict_labels = np.argmax(Y_predict, axis=1)
    Y_tr_labels = np.argmax(y_test_onehot, axis=1)

    # Matriks konfusi
    cf_matrix = confusion_matrix(Y_tr_labels, Y_predict_labels, labels=np.arange(len(named_labels)))
    print("Fold :", fold)
    print("Confusion Matrix : ")
    print(cf_matrix)

    print("Classification Report:")
    print(classification_report(Y_tr_labels, Y_predict_labels, labels=np.arange(len(named_labels)), target_names=named_labels))

In [None]:
# Make predictions on test set
best_fold = 5
best_model = fold_models[best_fold-1]
Y_predict = best_model.predict(X_test_reshaped)
Y_predict_labels = np.argmax(Y_predict, axis=1)
Y_tr_labels = np.argmax(y_test_onehot, axis=1)
 # Matriks konfusi
cf_matrix = confusion_matrix(Y_tr_labels, Y_predict_labels, labels=np.arange(len(named_labels)))
print("Best Fold :", best_fold)
print("Confusion Matrix : ")
print(cf_matrix)

print("Classification Report:")
print(classification_report(Y_tr_labels, Y_predict_labels, labels=np.arange(len(named_labels)), target_names=named_labels))

In [None]:
true_positive = np.diag(cf_matrix)
false_negative = np.sum(cf_matrix, axis=1) - true_positive
false_positive = np.sum(cf_matrix, axis=0) - true_positive
true_negative = np.sum(cf_matrix) - (true_positive + false_negative + false_positive)

sensitivity = true_positive / (true_positive + false_negative)
specificity = true_negative / (true_negative + false_positive)
print("Sensitivity:", sensitivity)
print("Specificity:", specificity)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
# Calculate percentages for each true class
conf_matrix_percentage = cf_matrix / cf_matrix.sum(axis=1, keepdims=True) * 100

# Create a heatmap using seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix_percentage, annot=True, fmt='.2f', cmap='Blues', xticklabels=named_labels, yticklabels=named_labels)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix - Prediction Percentages')
plt.show()

 27%|█████████████████████▏                                                       | 5998/21837 [03:00<06:18, 41.87it/s]

In [None]:
import pickle
with open('modelfinale_CNN', 'wb') as f :
    pickle.dump(model, f)

In [81]:
with open('test_CNN.npy', 'wb') as f :
    np.save(f, X_test_reshaped)

In [82]:
with open('y_test_onehot_CNN.npy', 'wb') as f :
    np.save(f, y_test_onehot)

In [83]:
test_data2 = np.load('test_CNN.npy')