#Import Libraries

In [1]:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight

import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.layers import SeparableConv1D, SpatialDropout1D

#Load Data

In [None]:
destination_folder = '/content/drive/MyDrive/RedoneDataOct/Processed/'

# Load X_train_scaled, X_test_scaled, y_train, y_test from the saved files
X_train_scaled = np.load(destination_folder + 'X_train_scaled.npy')
X_test_scaled = np.load(destination_folder + 'X_test_scaled.npy')
y_train = np.load(destination_folder + 'y_train.npy')
y_test = np.load(destination_folder + 'y_test.npy')

#Define Input Shape, Learning Rate, and Columns

In [None]:
# Define the input shape
input_shape = (4096, 12)
input_shape_single_lead = (input_shape[0], 1)

# Learning rate scheduler with warm-up and decay
initial_learning_rate = 1e-3
lr_schedule = ExponentialDecay(
    initial_learning_rate,
    decay_steps=100000,
    decay_rate=0.96,
    staircase=True
)

# Define the arrhythmia columns (ensure this is consistent with the above labels)
arrhythmia_columns = ['1dAVb', 'RBBB', 'LBBB', 'SB', 'ST', 'AF', 'normal_ecg']
label_columns = ['1dAVb', 'RBBB', 'LBBB', 'SB', 'ST', 'AF', 'normal_ecg']

#Define Model

In [None]:
num_classes = len(arrhythmia_columns)

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.2):
    x = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(inputs, inputs)
    x = layers.Dropout(dropout)(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    res = x + inputs

    x = layers.Dense(ff_dim, activation='relu')(res)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(inputs.shape[-1])(x) 
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    return x + res  

def build_model(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)

    # Initial Convolutional Block with SpatialDropout
    x = SeparableConv1D(filters=64, kernel_size=3, padding='same')(inputs)
    x = SpatialDropout1D(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling1D(pool_size=2)(x)

    # Residual Block 1 with Dilated Convolutions
    residual = SeparableConv1D(filters=128, kernel_size=1, padding='same')(x)
    x = SeparableConv1D(filters=128, kernel_size=3, padding='same', dilation_rate=2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = SeparableConv1D(filters=128, kernel_size=3, padding='same', dilation_rate=2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Add()([x, residual])
    x = layers.MaxPooling1D(pool_size=2)(x)

    # Residual Block 2 with Dilated Convolutions
    residual = SeparableConv1D(filters=256, kernel_size=3, padding='same')(x)
    x = SeparableConv1D(filters=256, kernel_size=5, padding='same', dilation_rate=4)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = SeparableConv1D(filters=256, kernel_size=5, padding='same', dilation_rate=4)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Add()([x, residual])
    x = layers.MaxPooling1D(pool_size=2)(x)

    # Residual Block 3 with Dilated Convolutions
    residual = SeparableConv1D(filters=512, kernel_size=5, padding='same')(x)
    x = SeparableConv1D(filters=512, kernel_size=7, padding='same', dilation_rate=4)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = SeparableConv1D(filters=512, kernel_size=7, padding='same', dilation_rate=4)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Add()([x, residual])
    x = layers.MaxPooling1D(pool_size=2)(x)

    # Residual Block 4 with Dilated Convolutions
    residual = SeparableConv1D(filters=1024, kernel_size=7, padding='same')(x)
    x = SeparableConv1D(filters=1024, kernel_size=9, padding='same', dilation_rate=4)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = SeparableConv1D(filters=1024, kernel_size=9, padding='same', dilation_rate=4)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Add()([x, residual])
    x = layers.MaxPooling1D(pool_size=2)(x)

    # **Bidirectional LSTM**
    x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.3))(x)

    # Additional Transformer Encoder Block
    x = transformer_encoder(x, head_size=64, num_heads=8, ff_dim=128, dropout=0.2)

    # Global Average Pooling
    x = layers.GlobalAveragePooling1D()(x)

    # Dense Layers with Dropout and Regularization
    x = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.002))(x)
    x = layers.Dropout(0.5)(x)

    # Output Layer
    outputs = layers.Dense(num_classes, activation='sigmoid')(x)

    model = models.Model(inputs=inputs, outputs=outputs)
    return model

model = build_model(input_shape, num_classes)

#Train Model

In [None]:
# Compile the model with Adam optimizer with a fixed learning rate
model.compile(
    optimizer=Adam(learning_rate=initial_learning_rate, clipvalue=1.0),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# Compute class weights based on y_train to handle class imbalance
class_weights = compute_class_weight('balanced', classes=np.unique(y_train.argmax(axis=1)), y=y_train.argmax(axis=1))
class_weight_dict = {i: weight if i != label_columns.index('1dAVb') else weight * 1.4 for i, weight in enumerate(class_weights)}

# Callbacks for early stopping and learning rate reduction
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001)
checkpoint_filepath = '/content/drive/MyDrive/RedoneDataOct/Multi/model_testing2.keras'

#Checkpoint for saving best model after each epoch
model_checkpoint = ModelCheckpoint(
    filepath=checkpoint_filepath,  # Path to save the model
    monitor='val_loss',            # Monitor validation loss
    save_best_only=True,           # Only save when validation loss improves
    mode='min',                    # Save when validation loss decreases
    verbose=1                      # Print a message when saving
)

# Train the model with class weights and validation on the test set
history = model.fit(X_train_scaled, y_train, epochs=100, batch_size=32,
                    validation_data=(X_test_scaled, y_test),
                    callbacks=[early_stopping, reduce_lr, model_checkpoint],
                    class_weight=class_weight_dict)