In [None]:
import kagglehub
import pandas as pd
import os

# Download latest version
path = kagglehub.dataset_download("sadmansakib7/ecg-arrhythmia-classification-dataset")

file_name = 'MIT-BIH Arrhythmia Database.csv'
full_path = os.path.join(path, file_name)

print("Full path to file:", full_path)


# load the dataset
df = pd.read_csv(full_path)



In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

# This code block is for the data exploration and visualization

# N (Normal):
# Description: Represents normal heartbeats. These are the most common and indicate a regular, healthy heartbeat pattern.
# Count: 90,083 instances in your dataset, indicating that normal heartbeats are the majority class.
# VEB (Ventricular Ectopic Beat):
# Description: These are premature heartbeats originating from the ventricles.
# Count: 7,009 instances, making it a minority class compared to normal beats.
# SVEB (Supraventricular Ectopic Beat):
# Description: These are premature heartbeats originating above the ventricles, often in the atria.
# Count: 2,779 instances, another minority class.
# F (Fusion Beat):
# Description: Fusion beats occur when a normal heartbeat and an ectopic beat occur at the same time
# Count: 803 instances, indicating it's a relatively rare occurrence in your dataset.
# Q (Unknown/Unclassified):
# Description: This category might represent beats that couldn't be classified into the other categories
# Count: 15 instances, making it the rarest class in your dataset, should be removed

# ------------------
# Basic Data Visualization
# ------------------

# Bar chart of class distribution
df['type'].value_counts().plot(kind='bar')
plt.title('Class Distribution')
plt.xlabel('Heartbeat Type')
plt.ylabel('Count')
plt.show()

# Data exploration
print(f'Dataset Shape: {df.shape}\n')
print(df.head(5))
df.info()
df.describe()

# Columns in dataset:
print(df.columns)
print(df['type'].value_counts())

# ------------------
# Univariate Analysis: Distribution of Each Feature
# ------------------
for col in df.select_dtypes(include=['float64', 'int64']).columns:
    if col != 'type':
        plt.figure(figsize=(6,4))
        sns.histplot(df[col], kde=True, bins=30)
        plt.title(f'Distribution of {col}')
        plt.xlabel(col)
        plt.ylabel('Frequency')
        plt.show()



# from the visualization, we can see that there are some classes with very few samples, and classes that don't do anything
if 'record' in df.columns:
    df = df.drop(columns=['record'])

# drop type ==  Q, not enough samples to form worthwhile predictions
df = df[df['type'] != 'Q']

# Encode the 'type' column,  because the classes are not ordinal
label_encoder = LabelEncoder()
df['type'] = label_encoder.fit_transform(df['type'])
df = df.dropna()

# Print the mapping of classes to encoded values
print(dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_))))

# ------------------
# Correlation Heatmap After Dropping Features
# ------------------
plt.figure(figsize=(12,8))
sns.heatmap(df.corr(), annot=False, cmap='coolwarm')
plt.title('Feature Correlation (After Dropping Features)')
plt.show()

# ==========================
# Multivariate Analysis
# ==========================

# 1. Pairplot for selected important features

selected_features = ['0_qt_interval', '0_pq_interval', '0_qrs_interval', 'type']
sns.pairplot(df[selected_features], hue='type')
plt.suptitle('Pairplot of Selected Features', y=1.02)
plt.show()

# 2. Scatter plot between two key features
plt.figure(figsize=(10,8))
sns.scatterplot(
    data=df,
    x='0_pre-RR',
    y='0_post-RR',
    color='cyan',    # Pick a single color for all dots
    alpha=0.7,
    edgecolor=None
)
plt.title('0_pre-RR vs 0_post-RR (No Legend)', fontsize=16)
plt.xlabel('0_pre-RR Interval (ms)', fontsize=14)
plt.ylabel('0_post-RR Interval (ms)', fontsize=14)
plt.grid(True, linestyle='--', alpha=0.3)

# Disable the legend here
plt.legend([],[], frameon=False)

plt.show()
# 3. Boxplots to examine feature distributions across types
for col in ['0_qt_interval', '0_pq_interval', '0_qrs_interval']:
    plt.figure(figsize=(6,4))
    sns.boxplot(x='type', y=col, data=df)
    plt.title(f'{col} by Type')
    plt.show()

# Split the data into features (X) and target (y)
# X contains all columns except 'type', which is our target variable
# y contains only the 'type' column which has been encoded to numeric values
X = df.drop('type', axis=1)
y = df['type']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.9, random_state=42)

# Initialize the StandardScaler
scaler = StandardScaler()

# Fit the scaler on the training data and transform both training and test data
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Apply SMOTE to the training data, to balance the classes
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train_scaled, y_train)

In [None]:
from sklearn.metrics import classification_report
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.neighbors import KNeighborsClassifier

# This code block is foor all the individual baseline models
model_results = {}

# Add this helper function for plotting confusion matrices
def plot_confusion_matrix(y_true, y_pred, title, class_names):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title(f'Confusion Matrix - {title}')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()

# Function to get weighted avg f1-score from classification report
def get_weighted_f1(y_true, y_pred):
    report = classification_report(y_true, y_pred, target_names=label_encoder.classes_, output_dict=True)
    return report['weighted avg']['f1-score']

# Initialize XGBoost classifier with specific parameters
# use_label_encoder=False: Avoid using the deprecated label encoder
# eval_metric='mlogloss': Use multiclass log loss as evaluation metric
# verbosity=0: Suppress verbose output
# random_state=42: Set seed for reproducibility
xgb_model = XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', verbosity=0, random_state=42)

# Train the XGBoost model on the training data
xgb_model.fit(X_train_resampled, y_train_resampled)

# Make predictions on the test data
y_pred_xgb = xgb_model.predict(X_test_scaled)

# Use the original class names from the label encoder for better readability in the report
model_results['XGBoost'] = {'report': classification_report(y_test, y_pred_xgb, target_names=label_encoder.classes_),
                           'weighted_f1': get_weighted_f1(y_test, y_pred_xgb)}

plot_confusion_matrix(y_test, y_pred_xgb, 'XGBoost', label_encoder.classes_)

# Initialize Random Forest classifier with specific parameters
# n_estimators=25: Use 25 trees in the forest
# random_state=42: Set seed for reproducibility
rf_model = RandomForestClassifier(n_estimators=25, random_state=42, n_jobs=-1)

# Train the Random Forest model on the training data
rf_model.fit(X_train_resampled, y_train_resampled)

# Make predictions on the test data
y_pred_rf = rf_model.predict(X_test_scaled)

# Print the classification report
model_results['Random Forest'] = {'report': classification_report(y_test, y_pred_rf, target_names=label_encoder.classes_),
                                  'weighted_f1': get_weighted_f1(y_test, y_pred_rf)}

plot_confusion_matrix(y_test, y_pred_rf, 'Random Forest', label_encoder.classes_)
# Initialize the SVM classifier with RBF kernel for non-linear data
svm_model = SVC(kernel='rbf', C=1.0, gamma='scale', random_state=42)

# Train the SVM model on the resampled training data
svm_model.fit(X_train_resampled, y_train_resampled)

# Make predictions on the test data
y_pred_svm = svm_model.predict(X_test_scaled)

# Evaluate the model
model_results['SVM'] = {'report': classification_report(y_test, y_pred_svm, target_names=label_encoder.classes_),
                        'weighted_f1': get_weighted_f1(y_test, y_pred_svm)}

plot_confusion_matrix(y_test, y_pred_svm, 'SVM', label_encoder.classes_)

# Now, a SVM classifier with linear kernel
svm_linear_model = SVC(kernel='linear', C=1.0, random_state=42)

# Train the SVM model on the resampled training data
svm_linear_model.fit(X_train_resampled, y_train_resampled)

# Make predictions on the test data
y_pred_svm_linear = svm_linear_model.predict(X_test_scaled)

# Evaluate the model
model_results['SVM Linear'] = {'report': classification_report(y_test, y_pred_svm_linear, target_names=label_encoder.classes_),
                               'weighted_f1': get_weighted_f1(y_test, y_pred_svm_linear)}

plot_confusion_matrix(y_test, y_pred_svm_linear, 'SVM Linear', label_encoder.classes_)
#Logistic Regression
logistic_model = LogisticRegression(random_state=42, max_iter=1000)

# Train the Logistic Regression model on the resampled training data
logistic_model.fit(X_train_resampled, y_train_resampled)

# Make predictions on the test data
y_pred_logistic = logistic_model.predict(X_test_scaled)

# Evaluate the model
model_results['Logistic Regression'] = {'report': classification_report(y_test, y_pred_logistic, target_names=label_encoder.classes_),
                                        'weighted_f1': get_weighted_f1(y_test, y_pred_logistic)}

plot_confusion_matrix(y_test, y_pred_logistic, 'Logistic Regression', label_encoder.classes_)

# KNN
knn_model = KNeighborsClassifier(n_neighbors=5)

# Train the KNN model on the resampled training data
knn_model.fit(X_train_resampled, y_train_resampled)

# Make predictions on the test data
y_pred_knn = knn_model.predict(X_test_scaled)

# Evaluate the model
model_results['KNN'] = {'report': classification_report(y_test, y_pred_knn, target_names=label_encoder.classes_),
                        'weighted_f1': get_weighted_f1(y_test, y_pred_knn)}

plot_confusion_matrix(y_test, y_pred_knn, 'KNN', label_encoder.classes_)

# Sort the model results by weighted f1-score
sorted_model_results = sorted(model_results.items(), key=lambda x: x[1]['weighted_f1'], reverse=True)

# Print the results
print("Model Results sorted by weighted f1-score:")
for model, result in sorted_model_results:
    print(f"\n{model} Results:")
    print(result['report'])
    print(f"Weighted F1-Score: {result['weighted_f1']:.4f}")


In [None]:
from sklearn.ensemble import VotingClassifier, StackingClassifier, BaggingClassifier, AdaBoostClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report
ensemble_results = {}

# we are using the best 3 models to create an ensemble model, which is XGBoost, Random Forest, and KNN
# This code block is for all the ensemble models
# Create a VotingClassifier
voting_model = VotingClassifier(
    estimators=[
        ('xgb', xgb_model),
        ('rf', rf_model),
        ('knn', knn_model)
    ],
    voting='hard'  # 'hard' for majority voting, 'soft' for averaging probabilities
)

# Train the ensemble model
voting_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_voting = voting_model.predict(X_test_scaled)

# Evaluate the ensemble model
ensemble_results['Voting Classifier'] = {'report': classification_report(y_test, y_pred_voting, target_names=label_encoder.classes_),
                                        'weighted_f1': get_weighted_f1(y_test, y_pred_voting)}

plot_confusion_matrix(y_test, y_pred_voting, 'Voting Classifier', label_encoder.classes_)

# Soft Voting results
soft_voting_model = VotingClassifier(
    estimators=[
        ('xgb', xgb_model),
        ('rf', rf_model),
        ('knn', knn_model)
    ],  
    voting='soft'  # 'hard' for majority voting, 'soft' for averaging probabilities
)

# Train the soft voting model
soft_voting_model.fit(X_train_resampled, y_train_resampled) 

# Make predictions
y_pred_soft_voting = soft_voting_model.predict(X_test_scaled)

# Evaluate the soft voting model
ensemble_results['Soft Voting'] = {'report': classification_report(y_test, y_pred_soft_voting, target_names=label_encoder.classes_),
                                  'weighted_f1': get_weighted_f1(y_test, y_pred_soft_voting)}

plot_confusion_matrix(y_test, y_pred_soft_voting, 'Soft Voting', label_encoder.classes_)

# Stacking Classifier
stacking_model = StackingClassifier(
    estimators=[
        ('xgb', xgb_model),
        ('rf', rf_model),   
        ('knn', knn_model)
    ],
    final_estimator=LogisticRegression(random_state=42, max_iter=1000)
)

# Train the stacking model
stacking_model.fit(X_train_resampled, y_train_resampled)   

# Make predictions
y_pred_stacking = stacking_model.predict(X_test_scaled)

# Evaluate the stacking model
ensemble_results['Stacking Classifier'] = {'report': classification_report(y_test, y_pred_stacking, target_names=label_encoder.classes_),
                                         'weighted_f1': get_weighted_f1(y_test, y_pred_stacking)}

plot_confusion_matrix(y_test, y_pred_stacking, 'Stacking Classifier', label_encoder.classes_)

# Bagging Classifier
bagging_model = BaggingClassifier(
    estimator=DecisionTreeClassifier(random_state=42),
    n_estimators=10,
    random_state=42
)   

# Train the bagging model
bagging_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_bagging = bagging_model.predict(X_test_scaled)   

# Evaluate the bagging model
ensemble_results['Bagging Classifier'] = {'report': classification_report(y_test, y_pred_bagging, target_names=label_encoder.classes_),
                                         'weighted_f1': get_weighted_f1(y_test, y_pred_bagging)}        

plot_confusion_matrix(y_test, y_pred_bagging, 'Bagging Classifier', label_encoder.classes_)

# AdaBoost Classifier
ada_model = AdaBoostClassifier(
    estimator=DecisionTreeClassifier(random_state=42),
    n_estimators=10,
    random_state=42
)           

# Train the AdaBoost model
ada_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_ada = ada_model.predict(X_test_scaled)

# Evaluate the AdaBoost model
ensemble_results['AdaBoost Classifier'] = {'report': classification_report(y_test, y_pred_ada, target_names=label_encoder.classes_),
                                          'weighted_f1': get_weighted_f1(y_test, y_pred_ada)}

plot_confusion_matrix(y_test, y_pred_ada, 'AdaBoost Classifier', label_encoder.classes_)

# Fine-tune parameters
gradient_model = GradientBoostingClassifier(
    n_estimators=200,
    learning_rate=0.1,
    max_depth=5,
    subsample=0.8,  # Add randomness
    random_state=42
)

# Train the GradientBoosting model
gradient_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_gradient = gradient_model.predict(X_test_scaled)

# Evaluate the GradientBoosting model
ensemble_results['GradientBoosting Classifier'] = {'report': classification_report(y_test, y_pred_gradient, target_names=label_encoder.classes_),
                                                  'weighted_f1': get_weighted_f1(y_test, y_pred_gradient)}

plot_confusion_matrix(y_test, y_pred_gradient, 'GradientBoosting Classifier', label_encoder.classes_)

# Print the results
sorted_ensemble_results = sorted(ensemble_results.items(), key=lambda x: x[1]['weighted_f1'], reverse=True)
print("Ensemble Model Results sorted by weighted f1-score:")
for model, result in sorted_ensemble_results:
    print(f"\n{model} Results:")
    print(result['report'])
    print(f"Weighted F1-Score: {result['weighted_f1']:.4f}")

In [17]:
import pandas as pd

def initialize_df():
    try:
        # Define the path to the CSV file
        df = pd.read_csv(full_path)
        if 'record' in df.columns:
            df = df.drop(columns=['record'])
        df = df[df['type'] != 'Q']
        label_encoder = LabelEncoder()
        df['type'] = label_encoder.fit_transform(df['type'])
        return df
    except FileNotFoundError:
        print(f"Error: The file 'MIT-BIH Arrhythmia Database.csv' was not found.")
        print("Please ensure the file exists in the current directory.")
        return None
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None

def initialize_data(df):
    if df is None:
        return None, None
    X = df.drop(columns=['type'])
    y = df['type']
    return X, y

def clean_data(X, y):
    if X is None or y is None:
        return None, None, None, None
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.9, random_state=42)

    # Initialize the StandardScaler
    scaler = StandardScaler()

    # Fit the scaler on the training data and transform both training and test data
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Apply SMOTE to the training data, to balance the classes
    smote = SMOTE(random_state=42)
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train_scaled, y_train)
    return X_train_resampled, y_train_resampled, X_test_scaled, y_test

def get_accuracy(y_pred, y_test):
    if y_pred is None or y_test is None:
        return 0
    
    import numpy as np
    accuracy = 1 - np.sum(np.abs(y_pred - y_test)) / np.shape(y_pred)[0]
    return accuracy

def test_prediction(df):
    if df is None:
        print("Error: DataFrame is None. Cannot proceed with prediction.")
        return 0
    
    #Create Weak Learner
    rf_model = RandomForestClassifier(n_estimators=10, random_state=42, n_jobs=-1)
    X, y = initialize_data(df)
    if X is None or y is None:
        return 0
    
    X_train, y_train, X_test, y_test = clean_data(X, y)
    if X_train is None:
        return 0
    
    rf_model.fit(X_train, y_train)
    y_pred = rf_model.predict(X_test)
    return get_accuracy(y_pred, y_test)

def print_statistics(y_pred, y_test):
    if y_pred is None or y_test is None:
        print("Error: Cannot print statistics with None values.")
        return
    
    #Visualize the Confusion Matrix
    print("The Confusion Matrix of the Model is:\n", confusion_matrix(y_test, y_pred))
    #Calculate the Accuracy
    accuracy = get_accuracy(y_pred, y_test)
    print("The Accuracy of the Model is:", accuracy, "/ 1.0")
    #Calculate the Precision, Recall, and F1 Score
    from sklearn.metrics import precision_recall_fscore_support
    prf = precision_recall_fscore_support(y_pred, y_test, average='weighted')
    print("The Precision of the Model is:", prf[0], "/ 1.0")
    print("The Recall of the Model is:", prf[1], "/ 1.0")
    print("The F1 Score is:", prf[2])
    print()    

In [None]:
#Filter Method: Calculate Pearson Coefficient
import numpy as np
from scipy import stats

df = initialize_df()
res = list()
for c in df.columns:
    res.append((c, stats.pearsonr(df[c], df["type"]).statistic))

#Drop Uncorrelated Features
print("Uncorrelated Features:")
for p in res:
    r = p[1]
    if np.abs(r) < 0.1: #All features have a coefficient < 0.5, so we lower the minimum to remove them to < 0.1
        print(p[0], p[1])
        df = df.drop(columns=[p[0]])

#Find Accuracy
y_pred_filter = test_prediction(df)

In [None]:
#Wrapper Method: Start with Full Set of Features & Begin Removing Features Based on which increases accuracy the mos

df = initialize_df()
cond = True
while cond == True:
    curr = 'type'
    wrap_accuracy = test_prediction(df)
    print("Initial Performance:", wrap_accuracy)
    max_ = wrap_accuracy
    for c in df.drop(columns=['type']).columns:
        pred = test_prediction(df.drop(columns=[c]))
        print("Remove", c, "Performance", pred)
        if pred > max_:
            curr = c
            max_ = pred
    #max = y_pred_wrap
    #for i in range(len(predictions)):
        #if predictions[i][
    if curr == 'type':
        print("Current Set Best Performance")
        cond = False
    else:
        print("Dropped Feature:", curr)
        df = df.drop(columns=[curr])

#Find Accuracy
y_pred_wrap = test_prediction(df)