In [1]:
pip install pandas numpy scipy scikit-learn xgboost wfdb neurokit2 biosppy peakutils


Collecting wfdb
  Downloading wfdb-4.3.0-py3-none-any.whl.metadata (3.8 kB)
Collecting neurokit2
  Downloading neurokit2-0.2.10-py2.py3-none-any.whl.metadata (37 kB)
Collecting biosppy
  Downloading biosppy-2.2.3-py2.py3-none-any.whl.metadata (6.0 kB)
Collecting peakutils
  Downloading PeakUtils-1.3.5-py3-none-any.whl.metadata (1.6 kB)
Collecting bidict (from biosppy)
  Downloading bidict-0.23.1-py3-none-any.whl.metadata (8.7 kB)
Collecting shortuuid (from biosppy)
  Downloading shortuuid-1.0.13-py3-none-any.whl.metadata (5.8 kB)
Collecting mock (from biosppy)
  Downloading mock-5.2.0-py3-none-any.whl.metadata (3.1 kB)
Downloading wfdb-4.3.0-py3-none-any.whl (163 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading neurokit2-0.2.10-py2.py3-none-any.whl (693 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m693.1/693.1 kB[0m [31m30.5 MB/s[0m eta [36m0:00:00[0m


In [2]:
import numpy as np
import pandas as pd
import wfdb
from wfdb import processing
from sklearn.ensemble import RandomForestClassifier, StackingClassifier, VotingClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, GridSearchCV
from imblearn.over_sampling import SMOTE
from sklearn.linear_model import LogisticRegression
import pywt
import matplotlib.pyplot as plt

# Load PTB-XL dataset
ptbxl_path = "/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/"
reference_df = pd.read_csv(ptbxl_path + "ptbxl_database.csv")
reference_df = reference_df.head(1000)
diagnostic_classes = pd.read_csv(ptbxl_path + "scp_statements.csv", index_col=0)["diagnostic_class"]

# Define abnormal classes (Main and Subclasses)
def is_abnormal(scp_codes):
    abnormal_classes = set([
        'MI', 'HYP', 'CD', 'STTC', 
        'LAFB/LPFB', 'IRBBB', 'ILBBB', 'CLBBB', 'CRBBB', '_AVB', 'IVCB', 'WPW',
        'LVH', 'RHV', 'LAO/LAE', 'RAO/RAE', 'SEHYP',
        'AMI', 'IMI', 'LMI', 'PMI',
        'ISCA', 'ISCI', 'ISC_', 'ST-T', 'NST_'
    ])

    if isinstance(scp_codes, str):
        scp_codes = eval(scp_codes)

    for code in scp_codes.keys():
        if code in abnormal_classes:
            return 1  # Abnormal
    return 0  # Normal

reference_df["is_abnormal"] = reference_df["scp_codes"].apply(is_abnormal)

# Feature Extraction with Wavelet Transform

def extract_features_from_signal(signal):
    features = []
    features.append(np.mean(signal))
    features.append(np.std(signal))
    features.append(np.median(signal))
    features.append(np.min(signal))
    features.append(np.max(signal))
    features.append(np.percentile(signal, 25))
    features.append(np.percentile(signal, 75))
    features.append(np.mean(np.diff(signal)))

    coeffs = pywt.wavedec(signal, 'db4', level=5)
    for coeff in coeffs:
        features.append(np.mean(coeff))
        features.append(np.std(coeff))
        features.append(np.min(coeff))
        features.append(np.max(coeff))

    return features

X, y = [], []

for _, row in reference_df.iterrows():
    ecg_path = ptbxl_path + row["filename_hr"].replace(".mat", "")
    try:
        record = wfdb.rdrecord(ecg_path)
    except FileNotFoundError:
        continue

    available_leads = record.sig_name
    lead_index = next((available_leads.index(lead) for lead in ["II", "MLII", "I"] if lead in available_leads), None)
    if lead_index is None:
        continue

    signal = record.p_signal[:, lead_index]
    signal = (signal - np.mean(signal)) / np.std(signal)

    try:
        xqrs = processing.XQRS(sig=signal, fs=record.fs)
        xqrs.detect()
        r_peaks = xqrs.qrs_inds
    except:
        r_peaks = processing.gqrs_detect(sig=signal, fs=record.fs)

    if len(r_peaks) < 5:
        continue

    rr_intervals = np.diff(r_peaks) / record.fs
    qrs_durations = np.array([r_peaks[i] - r_peaks[i - 1] for i in range(1, len(r_peaks))])

    features = extract_features_from_signal(signal)
    features.extend([
        len(r_peaks),
        np.mean(rr_intervals) if len(rr_intervals) > 0 else 0,
        np.std(rr_intervals) if len(rr_intervals) > 0 else 0,
        np.median(rr_intervals) if len(rr_intervals) > 0 else 0,
        np.mean(qrs_durations) if len(qrs_durations) > 0 else 0,
        np.std(qrs_durations) if len(qrs_durations) > 0 else 0
    ])

    X.append(features)
    y.append(row["is_abnormal"])

X = np.array(X)
y = np.array(y)

# Apply SMOTE
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(X_resampled, y_resampled, test_size=0.2, random_state=42)

# Base models
rf = RandomForestClassifier(random_state=42)
xgb = XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='logloss')

# Hyperparameter grids
param_grid_rf = {
    'n_estimators': [100, 200],
    'max_depth': [10, 20]
}
param_grid_xgb = {
    'n_estimators': [100, 200],
    'learning_rate': [0.01, 0.1],
    'max_depth': [3, 6]
}

# Grid Search for Random Forest
grid_rf = GridSearchCV(rf, param_grid_rf, cv=3, n_jobs=-1, scoring='accuracy')
grid_rf.fit(X_train, y_train)
rf_best = grid_rf.best_estimator_
print("Best RF Params:", grid_rf.best_params_)

# Grid Search for XGBoost
grid_xgb = GridSearchCV(xgb, param_grid_xgb, cv=3, n_jobs=-1, scoring='accuracy')
grid_xgb.fit(X_train, y_train)
xgb_best = grid_xgb.best_estimator_
print("Best XGB Params:", grid_xgb.best_params_)

# Voting Classifier
voting_clf = VotingClassifier(estimators=[
    ('rf', rf_best),
    ('xgb', xgb_best)
], voting='soft')

# Stacking Classifier
stacking_clf = StackingClassifier(
    estimators=[('rf', rf_best), ('xgb', xgb_best)],
    final_estimator=LogisticRegression(max_iter=1000, random_state=42)
)

# Evaluate all models
def evaluate_model(name, model):
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    print(f"\n{name} Results:")
    print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
    print(f"Precision: {precision_score(y_test, y_pred):.4f}")
    print(f"Recall: {recall_score(y_test, y_pred):.4f}")
    print(f"F1 Score: {f1_score(y_test, y_pred):.4f}")

# Evaluate all
evaluate_model("Random Forest", rf_best)
evaluate_model("XGBoost", xgb_best)
evaluate_model("Voting Classifier", voting_clf)
evaluate_model("Stacking Classifier", stacking_clf)



Learning initial signal parameters...
Failed to find 8 beats during learning.
Initializing using default parameters
Running QRS detection...
QRS detection complete.
Learning initial signal parameters...
Failed to find 8 beats during learning.
Initializing using default parameters
Running QRS detection...
QRS detection complete.
Learning initial signal parameters...
Failed to find 8 beats during learning.
Initializing using default parameters
Running QRS detection...
QRS detection complete.
Learning initial signal parameters...
Failed to find 8 beats during learning.
Initializing using default parameters
Running QRS detection...
QRS detection complete.
Learning initial signal parameters...
Failed to find 8 beats during learning.
Initializing using default parameters
Running QRS detection...
QRS detection complete.
Learning initial signal parameters...
Failed to find 8 beats during learning.
Initializing using default parameters
Running QRS detection...
QRS detection complete.
Learning i

In [3]:
import joblib

# Save the trained voting classifier
joblib.dump(voting_clf, 'voting_classifier.pkl')


['voting_classifier.pkl']

In [4]:
# Load the saved model
voting_loaded = joblib.load('voting_classifier.pkl')


In [5]:
import numpy as np
import wfdb
from wfdb import processing

def classify_new_ecg(file_path, model):
    try:
        record = wfdb.rdrecord(file_path)

        available_leads = record.sig_name
        lead_index = next((available_leads.index(lead) for lead in ["II", "MLII", "I"] if lead in available_leads), None)
        if lead_index is None:
            return "Unsupported lead"

        signal = record.p_signal[:, lead_index]
        signal = (signal - np.mean(signal)) / np.std(signal)

        try:
            xqrs = processing.XQRS(sig=signal, fs=record.fs)
            xqrs.detect()
            r_peaks = xqrs.qrs_inds
        except:
            r_peaks = processing.gqrs_detect(sig=signal, fs=record.fs)

        if len(r_peaks) < 5:
            return "Insufficient beats"

        rr_intervals = np.diff(r_peaks) / record.fs
        qrs_durations = np.array([r_peaks[i] - r_peaks[i - 1] for i in range(1, len(r_peaks))])

        features = extract_features_from_signal(signal)
        features.extend([
            len(r_peaks),
            np.mean(rr_intervals) if len(rr_intervals) > 0 else 0,
            np.std(rr_intervals) if len(rr_intervals) > 0 else 0,
            np.median(rr_intervals) if len(rr_intervals) > 0 else 0,
            np.mean(qrs_durations) if len(qrs_durations) > 0 else 0,
            np.std(qrs_durations) if len(qrs_durations) > 0 else 0
        ])

        prediction = model.predict([features])[0]
        return "Abnormal" if prediction == 1 else "Normal"

    except Exception as e:
        return f"Error: {str(e)}"


In [6]:
file_path = "/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/records500/00000/00008_hr"
result = classify_new_ecg(file_path, voting_loaded)
print(result)


Learning initial signal parameters...
Failed to find 8 beats during learning.
Initializing using default parameters
Running QRS detection...
QRS detection complete.
Abnormal


In [7]:

import sklearn
import numpy as np
import joblib

print(f"scikit-learn version: {sklearn.__version__}")
print(f"numpy version: {np.__version__}")
print(f"joblib version: {joblib.__version__}")


scikit-learn version: 1.2.2
numpy version: 1.26.4
joblib version: 1.4.2
