In [10]:
import pandas as pd
import numpy as np
import wfdb
import ast
import matplotlib.pyplot as plt

def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data

path = './'
sampling_rate=100
nrows=10000 # number of patients to load, set to None to load all patients

# load and convert annotation data
Y = pd.read_csv(path+'ptbxl_database.csv', index_col='ecg_id', nrows=nrows)
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, path)

# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path+'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)

# Split data into train and test
test_fold = 10
# Train
X_train = X[np.where(Y.strat_fold != test_fold)]
Y_train = Y[(Y.strat_fold != test_fold)].diagnostic_superclass
# Test
X_test = X[np.where(Y.strat_fold == test_fold)]
Y_test = Y[Y.strat_fold == test_fold].diagnostic_superclass

In [11]:
# Flatten lists to single string labels
Y_train = Y_train.apply(lambda x: x[0] if x else 'UNKNOWN')
Y_test = Y_test.apply(lambda x: x[0] if x else 'UNKNOWN')

In [12]:
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

# Convert Y_train and Y_test into numerical labels
label_encoder = LabelEncoder()
Y_train_encoded = label_encoder.fit_transform(Y_train)
Y_test_encoded = label_encoder.transform(Y_test)

In [13]:
# Calculate summary statistics across the time dimension (axis=1)
X_train_summary = np.concatenate([
    X_train.mean(axis=1),
    X_train.std(axis=1),
    X_train.min(axis=1),
    X_train.max(axis=1)
], axis=1)

X_test_summary = np.concatenate([
    X_test.mean(axis=1),
    X_test.std(axis=1),
    X_test.min(axis=1),
    X_test.max(axis=1)
], axis=1)

In [14]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Initialize the Random Forest model
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)

# Fit the model
rf_model.fit(X_train_summary, Y_train_encoded)

# Make predictions and evaluate
Y_pred = rf_model.predict(X_test_summary)

# Evaluation metrics
test_accuracy = accuracy_score(Y_test_encoded, Y_pred)
print("Test Accuracy:", test_accuracy)

print("\nClassification Report:")
print(classification_report(Y_test_encoded, Y_pred, target_names=label_encoder.classes_))

print("\nConfusion Matrix:")
print(confusion_matrix(Y_test_encoded, Y_pred))

Test Accuracy: 0.5825932504440497

Classification Report:
              precision    recall  f1-score   support

          CD       0.74      0.20      0.32        83
         HYP       0.56      0.35      0.43        99
          MI       0.50      0.47      0.48       260
        NORM       0.61      0.91      0.73       518
        STTC       0.80      0.03      0.05       146
     UNKNOWN       0.71      0.25      0.37        20

    accuracy                           0.58      1126
   macro avg       0.65      0.37      0.40      1126
weighted avg       0.61      0.58      0.52      1126


Confusion Matrix:
[[ 17   4  21  40   1   0]
 [  1  35  27  36   0   0]
 [  4  11 123 121   0   1]
 [  1   9  35 472   0   1]
 [  0   4  31 107   4   0]
 [  0   0  11   4   0   5]]


In [15]:
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier

# Параметры для поиска
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'class_weight': ['balanced', None]
}

# Модель с GridSearchCV
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=5, scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train_summary, Y_train_encoded)

# Лучшие параметры
print("Best parameters found:", grid_search.best_params_)

# Оценка на тесте с подобранными параметрами
best_rf_model = grid_search.best_estimator_
Y_pred = best_rf_model.predict(X_test_summary)

# Оценка результатов
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
print("Test Accuracy:", accuracy_score(Y_test_encoded, Y_pred))
print("\nClassification Report:\n", classification_report(Y_test_encoded, Y_pred))
print("\nConfusion Matrix:\n", confusion_matrix(Y_test_encoded, Y_pred))


Best parameters found: {'class_weight': None, 'max_depth': 30, 'min_samples_split': 2, 'n_estimators': 200}
Test Accuracy: 0.5737122557726465

Classification Report:
               precision    recall  f1-score   support

           0       0.74      0.20      0.32        83
           1       0.55      0.33      0.42        99
           2       0.49      0.46      0.47       260
           3       0.60      0.91      0.72       518
           4       0.50      0.01      0.01       146
           5       0.57      0.20      0.30        20

    accuracy                           0.57      1126
   macro avg       0.57      0.35      0.37      1126
weighted avg       0.57      0.57      0.51      1126


Confusion Matrix:
 [[ 17   2  23  40   1   0]
 [  1  33  26  39   0   0]
 [  5  10 119 124   0   2]
 [  0   9  36 472   0   1]
 [  0   5  31 109   1   0]
 [  0   1  10   5   0   4]]
