# CNN Model - 2018 Paper (Kachuee, Fazeli, Sarrafzadeh): CNN6

original

In [None]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, Input, Conv1D, MaxPooling1D, Flatten, Add, ReLU
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from imblearn.over_sampling import SMOTE

from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
)
from sklearn.model_selection import train_test_split

from imblearn.over_sampling import SMOTE, RandomOverSampler

from pathlib import Path
import re 

import pickle
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
print(tf.config.list_physical_devices('GPU'))  # should show []
from contextlib import redirect_stdout
import json
from collections import Counter

from src.visualization.visualization import plot_training_history

In [None]:
SAMPLING_METHOD = "SMOTE"
REMOVE_OUTLIERS = False
# model_name = "cnn6"  # as the paper, no smote! --> very bad...
model_name = "cnn6_sm" # with smote dataset
model_name = "cnn6_sm_1fcl"  # with smote but only one fully connected layer!

OUTPUT_PATH = "src/models/CNN/"
results_csv = "reports/03_model_testing_results/05_CNN_model_comparison.csv"
EPOCHS = 50

#import MIT data
df_mitbih_test = pd.read_csv('data/original/mitbih_test.csv', header = None)

X_train = pd.read_csv('data/processed/mitbih/X_train.csv')
y_train = pd.read_csv('data/processed/mitbih/y_train.csv')
y_train = y_train['187']

X_val = pd.read_csv('data/processed/mitbih/X_val.csv')
y_val = pd.read_csv('data/processed/mitbih/y_val.csv')
y_val = y_val['187']

X_test = df_mitbih_test.drop(187, axis = 1)
y_test = df_mitbih_test[187]


# Apply SMOTE
# --- Before SMOTE ---
print("Before SMOTE:", Counter(y_train))

# Convert to numpy if not already
X_train_np = np.array(X_train)
y_train_np = np.array(y_train)

# --- Apply SMOTE ---
sm = SMOTE(random_state=42, sampling_strategy='auto', k_neighbors=5)
X_train_sm, y_train_sm = sm.fit_resample(X_train_np, y_train_np)

print("After SMOTE:", Counter(y_train_sm))
print("X_train_sm shape:", X_train_sm.shape)

# Reshape the data for 1D CNN
#X_train_sm_cnn = np.expand_dims(X_train_sm, axis=2)
X_train_sm_cnn = np.expand_dims(X_train_sm, axis=2)
X_train_cnn = np.expand_dims(X_train, axis=2)
X_val_cnn = np.expand_dims(X_val, axis=2)
X_test_cnn = np.expand_dims(X_test, axis=2) 

display(X_train_cnn.shape)
display(X_val_cnn.shape)
display(X_test_cnn.shape)

In [None]:
# Input layer
input_layer = Input(shape=(187, 1))

# Initial convolution
conv_0 = Conv1D(filters=32, kernel_size=5, padding='same')(input_layer)

# ----- Residual Block 1 -----
x = Conv1D(32, 5, padding='same', activation='relu')(conv_0)
x = Conv1D(32, 5, padding='same')(x)
x = Add()([conv_0, x])
x = ReLU()(x)
x = MaxPooling1D(pool_size=5, strides=2, padding='same')(x)

# ----- Residual Block 2 -----
shortcut = x
x = Conv1D(32, 5, padding='same', activation='relu')(x)
x = Conv1D(32, 5, padding='same')(x)
x = Add()([shortcut, x])
x = ReLU()(x)
x = MaxPooling1D(pool_size=5, strides=2, padding='same')(x)

# ----- Residual Block 3 -----
shortcut = x
x = Conv1D(32, 5, padding='same', activation='relu')(x)
x = Conv1D(32, 5, padding='same')(x)
x = Add()([shortcut, x])
x = ReLU()(x)
x = MaxPooling1D(pool_size=5, strides=2, padding='same')(x)

# ----- Residual Block 4 -----
shortcut = x
x = Conv1D(32, 5, padding='same', activation='relu')(x)
x = Conv1D(32, 5, padding='same')(x)
x = Add()([shortcut, x])
x = ReLU()(x)
x = MaxPooling1D(pool_size=5, strides=2, padding='same')(x)

# ----- Residual Block 5 -----
shortcut = x
x = Conv1D(32, 5, padding='same', activation='relu')(x)
x = Conv1D(32, 5, padding='same')(x)
x = Add()([shortcut, x])
x = ReLU()(x)
x = MaxPooling1D(pool_size=5, strides=2, padding='same')(x)

# ----- Fully Connected Layers -----
x = Flatten()(x)
x = Dense(32, activation='relu')(x)
# x = Dense(32, activation='relu')(x) 
output_layer = Dense(5, activation='softmax')(x)

# Model
cnn6 = Model(inputs=input_layer, outputs=output_layer)

In [None]:
cnn6.summary()

In [None]:
# Learning rate with exponential decay
initial_learning_rate = 0.001
lr_schedule = ExponentialDecay(
    initial_learning_rate,
    decay_steps=10000,
    decay_rate=0.75,
    staircase=True
)

# Adam optimizer with specified hyperparameters
optimizer = Adam(
    learning_rate=lr_schedule,
    beta_1=0.9,
    beta_2=0.999
)

# Compile the model
cnn6.compile(
    optimizer=optimizer,
    loss='sparse_categorical_crossentropy', 
    metrics=['accuracy']
)


early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=8,
    min_delta=1e-4,
    restore_best_weights=True,
    verbose=1
)


checkpoint = ModelCheckpoint(
    filepath=OUTPUT_PATH+model_name+'_bs_epoch_{epoch:02d}_valloss_{val_loss:.4f}.keras',   # file path (can be .keras or .h5)
    monitor='val_loss',        # metric to monitor
    mode='min',                    # because higher accuracy is better
    save_best_only=True,           # only save when val_accuracy improves
    verbose=1                      # print message when a model is saved
)

In [None]:
print("Running fit WITH SMOTE")
history = cnn6.fit(
    X_train_sm,
    y_train_sm,
    epochs=EPOCHS,
    batch_size=128,
    validation_data=(X_val_cnn, y_val),  
    callbacks=[checkpoint, early_stopping]
)


In [None]:
def parse_epoch_from_name(name, default_epochs=EPOCHS):
    # Expect pattern like ..._epoch_12_...; returns int if found else default
    m = re.search(r"epoch_(\d+)", name)
    return int(m.group(1)) if m else default_epochs

def parse_val_loss_from_name(name):
    # Expect pattern like ..._valloss_0.1234.keras
    m = re.search(r"valloss_([0-9]+\.[0-9]+)", name)
    return float(m.group(1)) if m else np.nan

# Safer file filtering
model_dir = Path(OUTPUT_PATH)
model_paths = sorted([p for p in model_dir.glob("*.keras")])
model_paths = [p for p in model_paths if model_name in p.name]

all_labels = np.unique(y_test)  # ground-truth labels present in test set
rows = []

for p in model_paths:
    print(p)
    model_ = load_model(str(p))

    y_pred = model_.predict(X_test_cnn)
    y_pred_class = np.argmax(y_pred, axis=1)

    # Force consistent label space for metrics
    print(classification_report(y_test, y_pred_class, digits=4))
    report = classification_report(
        y_test, y_pred_class, labels=all_labels, output_dict=True, zero_division=0
    )

    print(pd.crosstab(y_test, y_pred_class, colnames=['Predictions']))

    accuracy = accuracy_score(y_test, y_pred_class)
    epoch_num = parse_epoch_from_name(p.name)
    val_loss = parse_val_loss_from_name(p.name)

    row = {
        "sampling_method": SAMPLING_METHOD,
        "outliers_removed": REMOVE_OUTLIERS,
        "epochs": epoch_num,
        "model": p.name,
        "val_loss": round(float(val_loss), 4) if not np.isnan(val_loss) else np.nan,
        "test_accuracy": round(float(accuracy), 4),
        "test_f1_macro": round(float(report["macro avg"]["f1-score"]), 4),
        "test_precision_macro": round(float(report["macro avg"]["precision"]), 4),
        "test_recall_macro": round(float(report["macro avg"]["recall"]), 4),
        "test_f1_weighted": round(float(report["weighted avg"]["f1-score"]), 4),
        "test_precision_weighted": round(float(report["weighted avg"]["precision"]), 4),
        "test_recall_weighted": round(float(report["weighted avg"]["recall"]), 4),
    }
    for lbl in all_labels:
        row[f"test_f1_cls_{int(lbl)}"] = round(float(report[str(lbl)]["f1-score"]), 4)
        row[f"test_precision_cls_{int(lbl)}"] = round(float(report[str(lbl)]["precision"]), 4)
        row[f"test_recall_cls_{int(lbl)}"] = round(float(report[str(lbl)]["recall"]), 4)
        row[f"test_support_cls_{int(lbl)}"] = int(report[str(lbl)]["support"])

    rows.append(row)

df = pd.DataFrame(rows)
os.makedirs(os.path.dirname(results_csv), exist_ok=True)
if os.path.exists(results_csv):
    df.to_csv(results_csv, mode='a', index=False, header=False)
else:
    df.to_csv(results_csv, index=False)

In [None]:
plot_training_history(history, "reports/figures/training_history/", model_name)