In [6]:
# # Import the class
# from mitbhi_dataset import MITBIHDataset, SUBJECTS

# # Initialize the dataset loader
# dataset = MITBIHDataset(
#     lead_index=0,          # Use the first ECG lead
#     apply_filter=True,     # Apply bandpass filtering
#     subjects=SUBJECTS,     # Use all subjects
#     subsets="aami"         # Use AAMI annotations
# )

# # Load the data
# X, y, s = dataset.load_data()

# # reshape the data to (n,1,m)
# X = X.reshape(X.shape[0], 1, X.shape[1])

# # X: ECG segments (features)
# # y: Labels (AAMI classes)
# # s: Subject IDs

### Load Processed MIT-BIH Arrhythmia Dataset 

In [None]:
from source.mitbhi_dataset import MITBIHDataset, SUBJECTS
import numpy as np
from sklearn.model_selection import GroupShuffleSplit

dataset = MITBIHDataset()
X, y, s = dataset.load_data()

# reshape the data to (n,1,m)
X = X.reshape(X.shape[0], 1, X.shape[1])

[32m2025-03-17 12:29:20.795[0m | [1mINFO    [0m | [36msource.mitbhi_dataset[0m:[36mload_data[0m:[36m120[0m - [1mDataset loaded from pickle file.[0m


In [14]:
splitter = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, test_idx = next(splitter.split(X, y, s))

X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]

In [15]:
print(X_train.shape)
print(X_test.shape)

(86919, 1, 180)
(22480, 1, 180)


In [None]:
import numpy as np
import pickle
import pandas as pd
from collections import Counter
from source.models import Models
from source.morph import Morph
from sklearn.metrics import accuracy_score
import time
import warnings
warnings.filterwarnings('ignore')

In [None]:
df_name = 'MITBIH'

# Initialize variables to store results
results_array = np.empty((0, 10)) 
results = {}

# Save Dataset Features
ts_length = X_train.shape[2]
df_size = X_train.shape[0]
n_classes = len(np.unique(y))  
class_counts = Counter(y_train)
num_classes = len(class_counts)

# Train Models
start = time.time()
lstm = Models('lstm', X_train, y_train)
lstm.train_lstm()
catch = Models('catch22', X_train, y_train)
catch.train_catch22()
rocket = Models('rocket', X_train, y_train)
rocket.train_rocket()


training_time = time.time()
print(f'Training time: {training_time - start}')

# Evaluate models
models = (lstm, catch, rocket)
acc = {}
for m in models:
    pred, _ = m.predict(X_test)
    acc[m.model_name] = accuracy_score(pred, y_test)

# Loop through each class
for c in np.unique(y):
    start_class = time.time()
    print(f'Processing Class: {c}')

    # Calculate class percentage
    class_perc = round(class_counts[c] / df_size, 3)

    # Perform morphing calculations
    mor = Morph(X_test, y_test, c)
    mor.get_DTWGlobalBorderline(perc_samples=0.1)
    res = mor.Binary_MorphingCalculater(models)

    end_class = time.time()
    print(f'Total Class {c} run time: {end_class - start_class}')

    # Store results for the class
    results[c] = res

    # Append results to NumPy array
    for model in res.keys():
        data = res[model]['metrics']
        line = np.array([[df_name, df_size, ts_length, n_classes, c, class_perc, model, data['mean'], data['std'], acc[model]]])
        results_array = np.vstack((results_array, line))
        

# Save results for the current dataset
file_name = f'results/pickles/{df_name}.pkl'
with open(file_name, 'wb') as f:
    pickle.dump(results, f)

# Clean ups
del models, lstm, catch, rocket, results

# Convert NumPy array to Pandas DataFrame
columns = ['dataset', 'df_size', 'ts_length', 'n_classes', 'class', 'class_perc', 'model', 'mean', 'std', 'model_acc']
dataframe = pd.DataFrame(results_array, columns=columns)
# Save results to CSV
dataframe.to_csv('results/use_case.csv', index=False)
