## **LIBRARY IMPORTS**

In [1]:
# Import Libraries
import cudf
import os
import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import wfdb  # For reading MIT-BIH data
import keras_tuner as kt
import seaborn as sns
import tensorflow as tf
from collections import Counter
from scipy.signal import find_peaks

# Scikit-learn and Imbalanced-learn imports
from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_auc_score,
    precision_recall_curve,
    auc,
    f1_score,
    precision_score,
    recall_score,
    accuracy_score
)
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTEENN, SMOTETomek
from imblearn.pipeline import Pipeline as ImbPipeline

# Model imports
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from scikeras.wrappers import KerasClassifier
from sklearn.utils.class_weight import compute_class_weight

# Additional setups
# Checking cUML
print(cudf.Series([1, 2, 3]))

# Setting TensorFlow flags
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Checking GPU
gpu_devices = tf.config.list_physical_devices('GPU')
if gpu_devices:
    print(f"TensorFlow has detected {len(gpu_devices)} GPU(s):")
    for device in gpu_devices:
        print(f"- {device}")
else:
    print("TensorFlow did not detect any GPUs. Training will run on the CPU.")


RuntimeError: Failed to dlopen libcudart.so.12

## **DATA PREPARATION**

### DATA PREPARATION FUNCTIONS
There are 3 types of functions:
1. Labels and windowed features of the RR intervals
2. Data preparation of MIT-BIH dataset as training and validation set
3. Data preparation of additional ECG data with a format of .bin as the final testing set

In [2]:
# 1. LABELS AND WINDOWED FEATURES
label_map = { 'N': 0, 'L': 0, 'R': 0, 'e': 0, 'j': 0,  # Normal Beats (N)
              'V': 1, 'E': 1,                          # Ventricular Ectopic (VEB)
              'S': 2, 'A': 2, 'a': 2, 'J': 2,          # Supraventricular Ectopic (SVEB)
              'F': 3,                                  # Fusion Beat (F)
              'P': 4, 'f': 4, 'Q': 4, '?': 4, "U": 4}          # Unknown Beat (Q)

# Dividing data into X and Y axis
def create_windowed_features(rr_intervals, labels, window_size):
    """Making a windowed features of RR intervals"""
    X, y = [], []
    for i in range(len(rr_intervals) - window_size):
        segment = rr_intervals[i:i+window_size]
        # The label is according to the beat on the last window
        label = labels[i + window_size - 1]
        X.append(segment)
        y.append(label)
    return np.array(X), np.array(y)

In [3]:
# 2. DATA PREPARATION OF MIT-BIH DATASET
# Reading MIT-BIH dataset
def load_mitbih_data(record_names, db_path):
    """
    Load ECG signals and the annotations from the MIT-BIH dataset
    Args:
        record_name (str): Record name (e.g.'100').
        db_path (str): MIT-BIH datapath (misal, 'mit-bih').

    Return:
        Returning raw ECG data (signal) and labels (annotation) from each record in an array of numpy
        tuple: (signal, annotation, fs)
    """
    signals, annotations = [], []
    for record in record_names:
        record_path = f'{db_path}/{record}'
        # Reading signal from the first channel
        signal = wfdb.rdrecord(record_path, channels=[0]).p_signal.flatten() # Other than .dat
        annotation = wfdb.rdann(record_path, 'atr') # .dat
        signals.append(signal)
        annotations.append(annotation)
    return signals, annotations

# Reading annotations and labels from each data that has been loaded 
def extract_rr_intervals_and_labels(annotations):
    """Extracting RR intervals and labels from the corresponding heartbeat."""
    all_rr, labels = [], []
    for ann in annotations:
        r_peaks = ann.sample
        beat_symbols = ann.symbol
        # Butuh setidaknya dua R-peak untuk menghitung interval
        for i in range(1, len(r_peaks)):
            symbol = beat_symbols[i]
            if symbol in label_map:
                rr_interval = (r_peaks[i] - r_peaks[i-1]) / ann.fs  # Gunakan frekuensi sampling spesifik rekaman
                all_rr.append(rr_interval)
                labels.append(label_map[symbol])
    return np.array(all_rr), np.array(labels)

In [4]:
# 3. DATA PREPARATION OF .bin ECG DATA
# Reading ECG data
def load_ecg_from_bin(file_path, dtype=np.int16):
    """
    Loading raw ECG signals from binary files.

    Args:
        file_path (str): Path to the .bin file.
        dtype (numpy.dtype): Data type of the signal in the .bin file.

    Return:
        numpy.ndarray: ECG signals as a numpy array.
    """
    try:
        # Reading .bin file and converting it into a numpy array
        signal = np.fromfile(file_path, dtype=dtype)
        print(f"Completed reading {len(signal)} samples from {file_path}")
        return signal
    except IOError as e:
        print(f"An error has occurred while reading: {e}")
        return None

def detect_r_peaks(signal, fs):
    """
    Detecting R-peaks from the ECG signal.

    Args:
        signal (numpy.ndarray): Raw ECG signal in a numpy array.
        fs (int): Sampling frequency of the ECG signal.

    Return:
        numpy.ndarray: An array of indices of the detected R-peaks.
    """
    print("\n--- Step 1: R-Peak Detection ---")
    # 'height' and 'distance' can be configured according to the signal.
    height_threshold = np.max(signal) * 0.5
    distance_threshold = fs * 0.4  # Minimum distance between heartbeats.

    r_peaks, _ = find_peaks(signal, height=height_threshold, distance=distance_threshold)

    print(f"Detected {len(r_peaks)} R-peaks.")
    return r_peaks

def extract_rr_and_apply_label_ecg_bin(r_peaks, fs, record_label):
    """
    Calculates RR intervals from a single record and assigns the same single label
    to all of those intervals.

    Args:
        r_peaks (numpy.ndarray): Array containing R-peak locations (in sample indices) from a single record.
        fs (int): Sampling frequency of the signal.
        record_label (any): A single label (e.g., string or integer) that will
                            be applied to this entire record.

    Return:
        tuple: A tuple containing (rr_intervals, labels).
               - rr_intervals (numpy.ndarray): Array containing RR intervals in seconds.
               - labels (numpy.ndarray): Array containing the same label for each RR interval.
    """
    print(f"\n--- Step 2: RR Extraction and Labeling for the Record ---")

    # Ensure there are enough R-peaks to calculate at least one interval
    if len(r_peaks) < 2:
        print("Warning: Not enough R-peaks to calculate RR intervals.")
        return np.array([]), np.array([])

    # Calculate all RR intervals in seconds
    rr_intervals = np.diff(r_peaks) / fs

    # Create a label array where each element is 'record_label'
    # The size of this label array is the same as the number of calculated RR intervals
    num_rr_intervals = len(rr_intervals)
    labels = np.full(shape=num_rr_intervals, fill_value=record_label)

    return rr_intervals, labels

### DATA PREPARATION EXECUTION
1. Reading all ECG datasets
2. Divide all datasets into training dataset and testing dataset
3. Standard scaling and combining datasets
4. Splitting training dataset into training split and validation split then applying SMOTE algorithm into the training split
5. Preparing all the datasets for each machine learning model

In [6]:
if __name__ == '__main__':
    # --- 0. INITIAL PARAMETERS FOR .BIN AND MIT-BIH DATA PREPARATION ---
    mitbih_dir = '../data/raw/MIT-BIH/mit-bih-arrhythmia-database-1.0.0/mit-bih-arrhythmia-database-1.0.0/'
    window_size = 10
    # Excluding 102, 104, 107, and 217
    # all_records = ['100', '101', '102', '103', '104', '105', '106', '107', '108', '109', '111',
    #                '112', '113', '114', '115', '116', '117', '118', '119', '121', '122', '123',
    #                '124', '200', '201', '202', '203', '205', '207', '208', '209', '210', '212',
    #                '213', '214', '215', '217', '219', '220', '221', '222', '223', '228', '230',
    #                '231', '232', '233', '234']
    ds1 = ['101', '106', '108', '109', '112', '114', '115', '116', '118', '119',
           '122', '124', '201', '203', '205', '207', '208', '209', '215', '220',
           '223', '230'] # Used for training
    ds2 = ['100', '103', '105', '111', '113', '117', '121', '123', '200', '202',
           '210', '212', '213', '214', '219', '221', '222', '228', '231', '232',
           '233', '234'] # Used for evaluation
    FS_CUSTOM = 500  # IMPORTANT: Adjust according to the sampling frequency of your .bin data
    custom_file_paths = {
        'Arrhythmia': '../data/raw/Arrhythmia/ECG_WAVE.bin',
        'Normal': '../data/raw/Normal/ecg_normal.bin'
    }
    custom_file_labels = {'Arrhythmia': 2, 'Normal': 0}

    print("="*60)
    print("🚀 STARTING DATASET PREPARATION PROCESS 🚀")
    print("="*60)

    # --- 1. PROCESS TRAINING DATA (ds1) ---
    print("\n--- [Step 1/5] Processing Training Data (ds1) ---")
    signals_train, annotations_train = load_mitbih_data(ds1, mitbih_dir)
    rr_train, labels_train = extract_rr_intervals_and_labels(annotations_train)
    X_train, y_train = create_windowed_features(rr_train, labels_train, window_size)
    print(f"Raw training data ready: X_train={X_train.shape}, y_train={y_train.shape}")

    # --- 2. PROCESS TESTING DATA (Combined ds2 and .bin) ---
    print("\n--- [Step 2/5] Processing Testing Data ---")

    # Part A: Process testing data from MIT-BIH (ds2)
    print("\nProcessing testing part 1 (ds2)...")
    signals_test_mitbih, annotations_test_mitbih = load_mitbih_data(ds2, mitbih_dir)
    rr_test_mitbih, labels_test_mitbih = extract_rr_intervals_and_labels(annotations_test_mitbih)
    X_test_mitbih, y_test_mitbih = create_windowed_features(rr_test_mitbih, labels_test_mitbih, window_size)
    print(f"MIT-BIH testing data ready: X_test_mitbih={X_test_mitbih.shape}, y_test_mitbih={y_test_mitbih.shape}")

    # Part B: Process testing data from .bin files
    print("\nProcessing testing part 2 (.bin)...")
    all_rr_custom = []
    all_labels_custom = []
    for category, path in custom_file_paths.items():
        signal_custom = load_ecg_from_bin(path)
        if signal_custom is not None:
            r_peaks_custom = detect_r_peaks(signal_custom, fs=FS_CUSTOM)
            rr_intervals_c, labels_c = extract_rr_and_apply_label_ecg_bin(
                r_peaks_custom, fs=FS_CUSTOM, record_label=custom_file_labels[category]
            )
            all_rr_custom.append(rr_intervals_c)
            all_labels_custom.append(labels_c)

    rr_test_custom = np.concatenate(all_rr_custom)
    labels_test_custom = np.concatenate(all_labels_custom)
    X_test_custom, y_test_custom = create_windowed_features(rr_test_custom, labels_test_custom, window_size)

    # --- 3. SCALING & COMBINING TESTING DATA ---
    print("\n--- [Step 3/5] Scaling and Finalizing Data ---")
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    print("Scaler trained on training data.")

    # Apply the scaler to all parts of the testing data
    X_test_mitbih_scaled = scaler.transform(X_test_mitbih)
    X_test_custom_scaled = scaler.transform(X_test_custom)
    print("Scaler applied to all testing data.")

    # Combine all scaled testing data
    X_test_final = np.concatenate((X_test_mitbih_scaled, X_test_custom_scaled), axis=0)
    y_test_final = np.concatenate((y_test_mitbih, y_test_custom), axis=0)
    print(f"Final testing data combined: X_test_final={X_test_final.shape}, y_test_final={y_test_final.shape}")

    # --- 4. TRAINING SET SPLITTING & OVERSAMPLING (SMOTE) ---
    print("\n--- [Step 4/5] Finalizing Training Data (Split & SMOTE) ---")
    output_dim = len(np.unique(y_train))
    
    print("Creating validation set from training data (80/20)...")
    X_train_fold, X_val, y_train_fold, y_val = train_test_split(
        X_train_scaled, y_train, test_size=0.2, random_state=42, stratify=y_train
    )

    print("Applying SMOTE only to the training fold...")
    print("Training class distribution before SMOTE:", Counter(y_train_fold))
    smote = SMOTE(random_state=42)
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train_fold, y_train_fold)
    print("Training class distribution after SMOTE:", Counter(y_train_resampled))

    # --- 5. FINAL DATA PREPARATION FOR THE MODEL ---
    print("\n--- [Step 5/5] Preparing Final Dataset for the Model ---")

    # One-hot encoding labels for Keras
    y_train_encoded = to_categorical(y_train_resampled, num_classes=output_dim)
    y_val_encoded = to_categorical(y_val, num_classes=output_dim)
    y_test_final_encoded = to_categorical(y_test_final, num_classes=output_dim)

    # 🧠 Data for MLP
    X_train_mlp, y_train_mlp = X_train_resampled, y_train_encoded
    X_val_mlp, y_val_mlp = X_val, y_val_encoded
    X_test_mlp, y_test_mlp = X_test_final, y_test_final_encoded

    # ⚡ Data for 1D-CNN
    X_train_cnn = X_train_mlp.reshape((X_train_mlp.shape[0], X_train_mlp.shape[1], 1))
    X_val_cnn = X_val_mlp.reshape((X_val_mlp.shape[0], X_val_mlp.shape[1], 1))
    X_test_cnn = X_test_mlp.reshape((X_test_mlp.shape[0], X_test_mlp.shape[1], 1))
    y_train_cnn, y_val_cnn, y_test_cnn = y_train_mlp, y_val_mlp, y_test_mlp

    # 📊 Data for RandomForest
    X_train_rf, y_train_rf = X_train_resampled, y_train_resampled
    X_val_rf, y_val_rf = X_val, y_val
    X_test_rf, y_test_rf = X_test_final, y_test_final

    # --- FINAL RESULTS ---
    print("\n" + "="*60)
    print("✅ DATA PREPARATION COMPLETE ✅")
    print("The following variables are ready to be used for training and evaluation:")
    print("="*60)

    print("\n--- For MLP ---")
    print(f"  Training:   X_train_mlp: {X_train_mlp.shape}, y_train_mlp: {y_train_mlp.shape}")
    print(f"  Validation: X_val_mlp: {X_val_mlp.shape}, y_val_mlp: {y_val_mlp.shape}")
    print(f"  Testing:    X_test_mlp: {X_test_mlp.shape}, y_test_mlp: {y_test_mlp.shape}")

    print("\n--- For 1D-CNN ---")
    print(f"  Training:   X_train_cnn: {X_train_cnn.shape}, y_train_cnn: {y_train_cnn.shape}")
    print(f"  Validation: X_val_cnn: {X_val_cnn.shape}, y_val_cnn: {y_val_cnn.shape}")
    print(f"  Testing:    X_test_cnn: {X_test_cnn.shape}, y_test_cnn: {y_test_cnn.shape}")

    print("\n--- For RandomForest ---")
    print(f"  Training:   X_train_rf: {X_train_rf.shape}, y_train_rf: {y_train_rf.shape}")
    print(f"  Validation: X_val_rf: {X_val_rf.shape}, y_val_rf: {y_val_rf.shape}")
    print(f"  Testing:    X_test_rf: {X_test_rf.shape}, y_test_rf: {y_test_rf.shape}")

🚀 STARTING DATASET PREPARATION PROCESS 🚀

--- [Step 1/5] Processing Training Data (ds1) ---
Raw training data ready: X_train=(51011, 10), y_train=(51011,)

--- [Step 2/5] Processing Testing Data ---

Processing testing part 1 (ds2)...
MIT-BIH testing data ready: X_test_mitbih=(49702, 10), y_test_mitbih=(49702,)

Processing testing part 2 (.bin)...
Completed reading 2380000 samples from ../data/raw/Arrhythmia/ECG_WAVE.bin

--- Step 1: R-Peak Detection ---
Detected 171 R-peaks.

--- Step 2: RR Extraction and Labeling for the Record ---
Completed reading 2135000 samples from ../data/raw/Normal/ecg_normal.bin

--- Step 1: R-Peak Detection ---
Detected 154 R-peaks.

--- Step 2: RR Extraction and Labeling for the Record ---

--- [Step 3/5] Scaling and Finalizing Data ---
Scaler trained on training data.
Scaler applied to all testing data.
Final testing data combined: X_test_final=(50015, 10), y_test_final=(50015,)

--- [Step 4/5] Finalizing Training Data (Split & SMOTE) ---
Creating validati

## **MACHINE LEARNING MODEL TRAINING**

### MACHINE LEARNING MODEL FUNCTIONS
There are 3 models that will be trained:
1. MLP Model (TA242501010)
2. 1D-CNN
3. RandomForest

There is also an additional function in order to do automatic hyperparameter tuning, but the function is only made for the MLP model.

In [7]:
# Function to build the MLP model for automatic hyperparameter tuning
def build_model(hp):
    """Function that builds a Keras model and defines the hyperparameters to be tuned."""
    model = Sequential()

    # Tune the number of units in the first hidden layer
    hp_units_1 = hp.Int('units_1', min_value=32, max_value=256, step=32)
    model.add(Dense(units=hp_units_1, activation='relu', input_shape=(X_train.shape[1],)))

    # Tune the dropout rate
    hp_dropout_1 = hp.Float('dropout_1', min_value=0.1, max_value=0.5, step=0.1)
    model.add(Dropout(rate=hp_dropout_1))

    # Tune the number of units in the second hidden layer
    hp_units_2 = hp.Int('units_2', min_value=32, max_value=256, step=32)
    model.add(Dense(units=hp_units_2, activation='relu'))

    # Tune the dropout rate
    hp_dropout_2 = hp.Float('dropout_2', min_value=0.1, max_value=0.5, step=0.1)
    model.add(Dropout(hp_dropout_2))
    model.add(Dense(output_dim, activation='softmax'))

    # Tune the learning rate for the Adam optimizer
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])

    model.compile(
        optimizer=Adam(learning_rate=hp_learning_rate),
        loss='categorical_crossentropy',
        metrics=[
            'accuracy',
            tf.keras.metrics.Precision(name='precision'),
            tf.keras.metrics.Recall(name='recall'),
            tf.keras.metrics.AUC(name='auc_roc'),
            tf.keras.metrics.AUC(name='auc_pr', curve='PR'),
            tf.keras.metrics.F1Score(average='weighted', name='f1_score')
        ]
    )
    return model

# Function to create the MLP model for cross-validation
def create_mlp_model(input_dim, output_dim):
    """Creates and compiles a Keras MLP model."""
    model = Sequential([
        # Hyperparameters tuning
        Dense(1024, input_dim=input_dim, activation='relu'),
        Dropout(0.1),
        Dense(1024, activation='relu'),
        Dropout(0.4),
        Dense(output_dim, activation='softmax') # Softmax for multi-class classification
    ])
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss='categorical_crossentropy', # Suitable for one-hot labels
        metrics=[
            'accuracy',
            tf.keras.metrics.Precision(name='precision'),
            tf.keras.metrics.Recall(name='recall'),
            tf.keras.metrics.F1Score(average='weighted', name='f1_score'),
            tf.keras.metrics.SpecificityAtSensitivity(0.9, name='specificity')
        ]
    )
    return model
# Function to create the 1D-CNN model
def create_cnn_model(input_shape, output_dim):
    """Creates and compiles a Keras 1D-CNN model."""
    # Input shape for CNN must be 3D: (samples, steps, features)
    # Example: (10000, 187, 1)

    model = Sequential([
        Conv1D(filters=448, kernel_size=6, activation='relu',
               input_shape=input_shape),
        Dropout(0.1),

        Conv1D(filters=448, kernel_size=3, activation='relu'),
        Dropout(0.2),

        Flatten(), # Flatten the data to connect to the Dense layer

        Dense(1024, activation='relu'),
        Dropout(0.4),

        Dense(output_dim, activation='softmax')
    ])

    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=[
            'accuracy',
            tf.keras.metrics.Precision(name='precision'),
            tf.keras.metrics.Recall(name='recall'),
            tf.keras.metrics.F1Score(average='weighted', name='f1_score'),
            tf.keras.metrics.SpecificityAtSensitivity(0.9, name='specificity')
        ]
    )
    return model

# Function to create the RandomForest model
def create_rf_model():
    """Creates an instance of the RandomForestClassifier model."""
    # Hyperparameters can be adjusted here
    model = RandomForestClassifier(
        n_estimators=300,  # Number of trees in the forest
        random_state=42,
        n_jobs=-1,         # Use all CPU cores
        max_depth=50
    )
    return model

### MACHINE LEARNING MODEL TRAINING EXECUTION
1. Targeted metrics: Precision, Recall, F1-Score, and Specificity
2. There are 3 executions: multiple models training, MLP model specific training, and MLP model automatic hyperparameter tuning

In [None]:
# Training Multiple Models
input_shape_cnn = (X_train_cnn.shape[1], X_train_cnn.shape[2])
input_dim = X_train_mlp.shape[1]
# output_dim already defined from the DATA PREPARATION section

models = {
    "1D-CNN": create_cnn_model(input_shape_cnn, output_dim),
    "RandomForest": create_rf_model(),
    "MLP": create_mlp_model(input_dim, output_dim)
}

# Dictionary to store the final results
results = {}

# --- TRAINING AND EVALUATING EACH MODEL ---

for name, model in models.items():
    print(f"\n{'='*20} TRAINING MODEL: {name} {'='*20}")

    # 🧠 Training
    if name == "1D-CNN":
        model.fit(
            X_train_cnn, y_train_cnn,
            epochs=20,
            batch_size=16,
            verbose=1,
            validation_data=(X_val_cnn, y_val_cnn) # Using the existing validation set
        )
    elif name == "MLP":
        model.fit(
            X_train_mlp, y_train_mlp,
            epochs=20,
            batch_size=16,
            verbose=1,
            validation_data=(X_val_mlp, y_val_mlp) # Using the existing validation set
        )
    else: # 📊 RandomForest
        model.fit(X_train_rf, y_train_rf)

    # ⚡ Prediction on the Test Set
    print(f"Evaluating model {name}...")
    if name == "MLP":
        y_pred_raw = model.predict(X_test_mlp)
        y_pred = np.argmax(y_pred_raw, axis=1)
    elif name == "1D-CNN":
        y_pred_raw = model.predict(X_test_cnn)
        y_pred = np.argmax(y_pred_raw, axis=1)
    else: # RandomForest
        y_pred = model.predict(X_test_rf)

    # Save prediction results for final evaluation
    results[name] = {'y_pred': y_pred}

# --- PRINT ALL RESULTS SIMULTANEOUSLY ---

class_names = ['Normal (N)', 'Ventricular (V)', 'Supraventricular (S)', 'Fusion (F)', 'Unknown (Q)']

print(f"\n{'='*25} FINAL EVALUATION RESULTS {'='*25}")

for name, result_data in results.items():
    y_pred_test = result_data['y_pred']

    print(f"\n\n--- REPORT FOR MODEL: {name} ---")

    # Standard Classification Report (using y_test_rf, which are the original integer labels)
    print("\nClassification Report on the Test Set:")
    print(classification_report(y_test_rf, y_pred_test, target_names=class_names))

    # Additional Metrics Report
    print("Additional Metrics Report:")
    cm = confusion_matrix(y_test_rf, y_pred_test)
    for i in range(len(class_names)):
        tn = cm.sum() - (cm[i,:].sum() + cm[:,i].sum() - cm[i,i])
        tp = cm[i,i]
        fp = cm[:,i].sum() - cm[i,i]
        fn = cm[i,:].sum() - cm[i,i]

        sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

        print(f"  Class: {class_names[i]}")
        print(f"    - Sensitivity (Recall): {sensitivity:.4f}")
        print(f"    - Specificity         : {specificity:.4f}")
        print(f"    - False Positive Rate : {fpr:.4f}")

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
I0000 00:00:1754468119.395005    5765 gpu_device.cc:2019] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 3620 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3050 6GB Laptop GPU, pci bus id: 0000:01:00.0, compute capability: 8.6


In [None]:
# Training MLP Model - TA242501010
# Initialize AI model
model = create_mlp_model(input_dim, output_dim)

# Prepare EarlyStopping callback for F1 score validation
early_stopping_1 = EarlyStopping(monitor='val_f1_score', patience=5, restore_best_weights=True)

# Prepare class weights
class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weight_dict = dict(enumerate(class_weights))

# Start AI model training
history = model.fit(
    X_train_resampled,
    y_train_resampled_encoded,
    epochs=20,
    batch_size=16,
    validation_data=(X_val_fold, y_val_fold_encoded),
    verbose=1,
    # class_weight=class_weight_dict,
    validation_split=0.2
    # callbacks=[early_stopping_1]
)

# Evaluate on the untouched set
print("Evaluating on the untouched test set...")
X_test_scaled = scaler.transform(X_test)
y_pred_test_raw = model.predict(X_test_scaled)

# Convert predictions back to labels if they are one-hot encoded
if hasattr(y_pred_test_raw, 'shape') and len(y_pred_test_raw.shape) > 1:
      y_pred_test = np.argmax(y_pred_test_raw, axis=1)
else:
      y_pred_test = y_pred_test_raw

class_names = ['Normal (N)', 'Ventricular (V)', 'Supraventricular (S)', 'Fusion (F)', 'Unknown (Q)']

print("\nClassification Report on the Test Set:")
print(classification_report(y_test, y_pred_test, target_names=class_names))

# Calculate confusion matrix
cm = confusion_matrix(y_test, y_pred_test)

# Calculate metrics for each class (one-vs-rest)
print("\nAdditional Metrics Report:")
print("="*55)
for i in range(len(class_names)):
    tn = cm.sum() - (cm[i,:].sum() + cm[:,i].sum() - cm[i,i])
    tp = cm[i,i]
    fp = cm[:,i].sum() - cm[i,i]
    fn = cm[i,:].sum() - cm[i,i]

    # Sensitivity (Recall)
    sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
    # Specificity
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    # False Positive Rate
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

    print(f"Class: {class_names[i]}")
    print(f"  Sensitivity (Recall): {sensitivity:.4f}")
    print(f"  Specificity         : {specificity:.4f}")
    print(f"  False Positive Rate : {fpr:.4f}")
    print("-"*25)

In [None]:
# Automatic Hyperparameter Tuning
print("\n--- Starting Automatic Hyperparameter Tuning with KerasTuner ---")

# Calculate class_weight only once
# class_weights = compute_class_weight(
#     'balanced',
#     classes=np.unique(y_train),
#     y=y_train
# )
# class_weight_dict = dict(enumerate(class_weights))

# Defining tuner objectives
multi_objectives = [
    kt.Objective("val_f1_score", direction="max")
    #kt.Objective("val_specificity", direction="max")
    #kt.Objective("val_precision", direction="max")
    #kt.Objective("val_recall", direction="max")
    #kt.Objective("val_auc_roc", direction="max")
    #kt.Objective("val_auc_pr", direction="max")
]
# Initialize Tuner with RandomSearch
tuner = kt.RandomSearch(
    build_model,
    objective=multi_objectives, # Target: maximize validation F1 score
    max_trials=20,              # Total number of hyperparameter combinations to be tried
    executions_per_trial=1,     # Number of models trained per combination (for stability)
    directory='keras_tuner_dir',
    project_name='ecg_classification_0834' # Can change the name to find the latest parameters with the latest code
)

# Prepare EarlyStopping callback for F1 score validation
early_stopping = EarlyStopping(monitor='val_f1_score', patience=5, restore_best_weights=True)

# Run the search
print("\nStarting the search for the best hyperparameters...")
tuner.search(
    X_train_resampled,              # Training data that has been processed with SMOTE
    y_train_resampled_encoded,
    epochs=100,
    validation_data=(X_val_fold, y_val_fold_encoded), # 1. Use validation_data
    # class_weight=class_weight_dict,
    callbacks=[early_stopping],
    verbose=1
)

# Get the best hyperparameters and the best model
best_model = tuner.get_best_models(num_models=1)[0]
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"""
--- Search Complete ---
Best hyperparameters found:
- Units 1: {best_hps.get('units_1')}
- Dropout 1: {best_hps.get('dropout_1'):.2f}
- Units 2: {best_hps.get('units_2')}
- Dropout 2: {best_hps.get('dropout_2'):.2f}
- Learning Rate: {best_hps.get('learning_rate')}
""")

# --- Final Evaluation on the Test Set ---
print("\n--- Evaluating the Best Model on the Test Set ---")
X_test_scaled = scaler.transform(X_test) # Use the same scaler from training
y_pred_test_raw = best_model.predict(X_test_scaled)
y_pred_test = np.argmax(y_pred_test_raw, axis=1)
class_names = ['Normal (N)', 'Ventricular (V)', 'Supraventricular (S)', 'Fusion (F)', 'Unknown (Q)']

# --- 1. Classification Report ----
print("\nClassification Report on the Test Set:")
print(classification_report(y_test, y_pred_test, target_names=class_names))

# --- 2. Confusion Matrix ---
print("\n--- Confusion Matrix ---")
cm = confusion_matrix(y_test, y_pred_test)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.title('Confusion Matrix')
plt.ylabel('Actual Class')
plt.xlabel('Predicted Class')
plt.show()

# --- 3. Specific Metric Calculation per Class ---
print("\n--- Detailed Performance Metrics per Class ---")
metrics_data = []

for i, class_name in enumerate(class_names):
    # Specificity calculation
    tn = cm.sum() - (cm[i,:].sum() + cm[:,i].sum() - cm[i,i])
    fp = cm[:,i].sum() - cm[i,i]
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

    metrics_data.append({
        "Class": class_name,
        "Precision": precision_score(y_test, y_pred_test, average=None)[i],
        "Sensitivity (Recall)": recall_score(y_test, y_pred_test, average=None)[i],
        "F1-Score": f1_score(y_test, y_pred_test, average=None)[i],
        "Specificity": specificity
    })

metrics_df = pd.DataFrame(metrics_data)
print(metrics_df.to_string())


# --- 4. Calculation of AUC-ROC and AUC-PR (One-vs-Rest) ---
y_test_encoded = to_categorical(y_test, num_classes=output_dim)

# Add this line to define y_pred_proba
y_pred_proba = best_model.predict(X_test_scaled)

# AUC-ROC
auc_roc_ovr = roc_auc_score(y_test_encoded, y_pred_proba, multi_class='ovr', average='weighted')
print(f"\nAUC-ROC (One-vs-Rest, Weighted): {auc_roc_ovr:.4f}")

# AUC-PR
# Calculate for each class and average
precision_curves = dict()
recall_curves = dict()
auc_pr_scores = []
for i in range(output_dim):
    precision_curves[i], recall_curves[i], _ = precision_recall_curve(y_test_encoded[:, i], y_pred_proba[:, i])
    auc_pr_scores.append(auc(recall_curves[i], precision_curves[i]))

# Weighted average for AUC-PR
support = np.bincount(y_test)
avg_auc_pr = np.average(auc_pr_scores, weights=support)
print(f"AUC-PR (One-vs-Rest, Weighted): {avg_auc_pr:.4f}")

# Plotting PR Curves for each class
plt.figure(figsize=(10, 8))
for i, class_name in enumerate(class_names):
    plt.plot(recall_curves[i], precision_curves[i], lw=2, label=f'{class_name} (AUC-PR = {auc_pr_scores[i]:.2f})')

plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve per Class")
plt.legend(loc="best")
plt.grid(True)
plt.show()