In [16]:
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, LSTM, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

In [17]:
# Folder paths
folders = {
    "Abnormal Heartbeat": r"D:\FYP\Cadivas CNN\preprocessed_1d\AHB",
    "Myocardial Infarction": r"D:\FYP\Cadivas CNN\preprocessed_1d\MI",
    "Normal": r"D:\FYP\Cadivas CNN\preprocessed_1d\NORMAL",
    "History of MI": r"D:\FYP\Cadivas CNN\preprocessed_1d\PM"
}

all_data = []
for label, folder in folders.items():
    for file in os.listdir(folder):
        if file.endswith(".csv"):
            df = pd.read_csv(os.path.join(folder, file))
            df['Class'] = label
            all_data.append(df)

# Combine all data
data = pd.concat(all_data, ignore_index=True)

In [18]:
# Features (first 255 columns)
X = data.iloc[:, :255].values  
y = data['Class'].values

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_onehot = to_categorical(y_encoded)

# Normalize features
scaler = StandardScaler()
X = scaler.fit_transform(X)

In [19]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y_onehot, test_size=0.2, random_state=42, stratify=y_onehot
)

# Reshape for CNN+LSTM (samples, timesteps, features=1)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

In [20]:
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_encoded),
    y=y_encoded
)
class_weights = dict(enumerate(class_weights))
print("Class Weights:", class_weights)

Class Weights: {0: 0.9957081545064378, 1: 1.3488372093023255, 2: 0.9707112970711297, 3: 0.8169014084507042}


In [21]:
model = Sequential([
    Input(shape=(X_train.shape[1], 1)),

    Conv1D(64, kernel_size=5, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(128, kernel_size=5, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    LSTM(64, return_sequences=False),

    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(y_onehot.shape[1], activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_5 (Conv1D)           (None, 255, 64)           384       
                                                                 
 batch_normalization_5 (Batc  (None, 255, 64)          256       
 hNormalization)                                                 
                                                                 
 max_pooling1d_5 (MaxPooling  (None, 127, 64)          0         
 1D)                                                             
                                                                 
 conv1d_6 (Conv1D)           (None, 127, 128)          41088     
                                                                 
 batch_normalization_6 (Batc  (None, 127, 128)         512       
 hNormalization)                                                 
                                                      

In [22]:
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1)
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

In [23]:
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=120,
    batch_size=64,
    class_weight=class_weights,
    callbacks=[lr_scheduler, early_stop],
    verbose=1
)

Epoch 1/120
Epoch 2/120
Epoch 3/120
Epoch 4/120
Epoch 5/120
Epoch 6/120
Epoch 7/120
Epoch 8/120
Epoch 9/120
Epoch 10/120
Epoch 11/120
Epoch 12/120
Epoch 13/120
Epoch 14/120
Epoch 15/120
Epoch 16/120
Epoch 17/120
Epoch 18/120
Epoch 19/120
Epoch 20/120
Epoch 21/120
Epoch 22/120
Epoch 23/120
Epoch 24/120
Epoch 25/120
Epoch 26/120
Epoch 27/120
Epoch 28/120
Epoch 29/120
Epoch 30/120
Epoch 31/120
Epoch 31: ReduceLROnPlateau reducing learning rate to 0.0005000000237487257.
Epoch 32/120
Epoch 33/120
Epoch 34/120
Epoch 35/120
Epoch 36/120
Epoch 37/120
Epoch 37: ReduceLROnPlateau reducing learning rate to 0.0002500000118743628.
Epoch 38/120
Epoch 39/120
Epoch 40/120
Epoch 41/120
Epoch 42/120
Epoch 43/120
Epoch 44/120
Epoch 45/120
Epoch 45: ReduceLROnPlateau reducing learning rate to 0.0001250000059371814.
Epoch 46/120
Epoch 47/120
Epoch 48/120
Epoch 49/120
Epoch 50/120
Epoch 50: ReduceLROnPlateau reducing learning rate to 6.25000029685907e-05.


In [24]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

# Predictions
y_pred_probs = model.predict(X_test)
y_test_labels = np.argmax(y_test, axis=1)
y_pred_labels = np.argmax(y_pred_probs, axis=1)

# Classification report
print("\nClassification Report:\n")
print(classification_report(y_test_labels, y_pred_labels, target_names=label_encoder.classes_))

# Confusion matrix
cm = confusion_matrix(y_test_labels, y_pred_labels)
print("\nConfusion Matrix:\n", cm)

# Accuracy
acc = accuracy_score(y_test_labels, y_pred_labels)
print("\nFinal Test Accuracy: {:.2f}%".format(acc * 100))


Classification Report:

                       precision    recall  f1-score   support

   Abnormal Heartbeat       0.94      0.83      0.88       606
        History of MI       0.80      0.90      0.85       447
Myocardial Infarction       0.97      1.00      0.99       621
               Normal       0.93      0.93      0.93       739

             accuracy                           0.92      2413
            macro avg       0.91      0.91      0.91      2413
         weighted avg       0.92      0.92      0.92      2413


Confusion Matrix:
 [[504  64  10  28]
 [ 17 402   2  26]
 [  0   0 621   0]
 [ 14  35   4 686]]

Final Test Accuracy: 91.71%


In [27]:
model.save("CNN+LSTM(91).h5")
import joblib
joblib.dump(label_encoder, "label_encoder(cnn+lstm).pkl")

['label_encoder(cnn+lstm).pkl']

In [None]:
BiLSTM

In [28]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, BatchNormalization
from tensorflow.keras.layers import Dense, Dropout, Input, Bidirectional, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

In [29]:
# Assume X, y already prepared (X: signals, y: labels)

# Encode labels
le = LabelEncoder()
y_encoded = le.fit_transform(y)
y_onehot = tf.keras.utils.to_categorical(y_encoded)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y_onehot, test_size=0.2, random_state=42, stratify=y_onehot
)

# Expand dims for Conv1D (samples, timesteps, channels)
X_train = np.expand_dims(X_train, -1)
X_test = np.expand_dims(X_test, -1)

# Augmentation: add Gaussian noise
def add_noise(X, noise_factor=0.01):
    return X + noise_factor * np.random.normal(size=X.shape)

X_train_noisy = add_noise(X_train)
y_train_noisy = y_train.copy()

# Concatenate original + augmented
X_train_aug = np.concatenate([X_train, X_train_noisy])
y_train_aug = np.concatenate([y_train, y_train_noisy])

In [30]:
model = Sequential([
    Input(shape=(X_train.shape[1], 1)),

    # CNN Feature Extractor
    Conv1D(128, kernel_size=5, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(256, kernel_size=3, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    # BiLSTM for temporal dependencies
    Bidirectional(LSTM(128, return_sequences=False)),

    # Fully connected
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(y_onehot.shape[1], activation='softmax')
])

model.compile(
    optimizer=Adam(learning_rate=1e-3),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

In [31]:
callbacks = [
    EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, verbose=1)
]

history = model.fit(
    X_train_aug, y_train_aug,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=64,
    callbacks=callbacks,
    verbose=1
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 13: ReduceLROnPlateau reducing learning rate to 0.0005000000237487257.
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 17: ReduceLROnPlateau reducing learning rate to 0.0002500000118743628.


In [33]:
# Predictions
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

# Classification Report
print("\nClassification Report:\n")
print(classification_report(y_true, y_pred_classes, target_names=le.classes_))

# Confusion Matrix
cm = confusion_matrix(y_true, y_pred_classes)
print("\nConfusion Matrix:\n", cm)

# Final Accuracy
loss, acc = model.evaluate(X_test, y_test, verbose=0)
print(f"\nFinal Test Accuracy: {acc * 100:.2f}%")


Classification Report:

                       precision    recall  f1-score   support

   Abnormal Heartbeat       0.89      0.82      0.85       606
        History of MI       0.80      0.79      0.79       447
Myocardial Infarction       0.95      1.00      0.98       621
               Normal       0.89      0.92      0.90       739

             accuracy                           0.89      2413
            macro avg       0.88      0.88      0.88      2413
         weighted avg       0.89      0.89      0.89      2413


Confusion Matrix:
 [[495  53  20  38]
 [ 42 351   5  49]
 [  0   0 621   0]
 [ 22  33   5 679]]

Final Test Accuracy: 88.93%


In [None]:
Improved

In [42]:
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, Dropout, LSTM
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import joblib

In [43]:
# Folder paths
folders = {
    "Abnormal Heartbeat": r"D:\FYP\Cadivas CNN\preprocessed_1d\AHB",
    "Myocardial Infarction": r"D:\FYP\Cadivas CNN\preprocessed_1d\MI",
    "Normal": r"D:\FYP\Cadivas CNN\preprocessed_1d\NORMAL",
    "History of MI": r"D:\FYP\Cadivas CNN\preprocessed_1d\PM"
}

all_data = []
for label, folder in folders.items():
    for file in os.listdir(folder):
        if file.endswith(".csv"):
            df = pd.read_csv(os.path.join(folder, file))
            df['Class'] = label
            all_data.append(df)

# Combine all data
data = pd.concat(all_data, ignore_index=True)

# Features & Labels
X = data.iloc[:, :255].values
y = data['Class'].values

In [44]:
# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_onehot = to_categorical(y_encoded)

# Save label encoder
joblib.dump(label_encoder, "label_encoder.pkl")

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y_onehot, test_size=0.2, random_state=42, stratify=y_onehot
)

# Reshape for 1D CNN
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

print("Train shape:", X_train.shape, " Test shape:", X_test.shape)

Train shape: (9651, 255, 1)  Test shape: (2413, 255, 1)


In [45]:
model = Sequential([
    Input(shape=(X_train.shape[1], 1)),

    Conv1D(128, kernel_size=5, activation='relu', padding='same'),
    MaxPooling1D(pool_size=2),
    Dropout(0.2),

    Conv1D(256, kernel_size=5, activation='relu', padding='same'),
    MaxPooling1D(pool_size=2),
    Dropout(0.2),

    LSTM(128, return_sequences=False, dropout=0.2, recurrent_dropout=0.2),

    Dense(128, activation='relu'),
    Dropout(0.3),

    Dense(y_onehot.shape[1], activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_11 (Conv1D)          (None, 255, 128)          768       
                                                                 
 max_pooling1d_11 (MaxPoolin  (None, 127, 128)         0         
 g1D)                                                            
                                                                 
 dropout_7 (Dropout)         (None, 127, 128)          0         
                                                                 
 conv1d_12 (Conv1D)          (None, 127, 256)          164096    
                                                                 
 max_pooling1d_12 (MaxPoolin  (None, 63, 256)          0         
 g1D)                                                            
                                                                 
 dropout_8 (Dropout)         (None, 63, 256)          

In [46]:
early_stop = EarlyStopping(monitor="val_loss", patience=7, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-6)

In [47]:
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=32,
    callbacks=[early_stop, reduce_lr],
    verbose=1
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [49]:
# Predictions
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test, axis=1)

# Classification Report
print("\nClassification Report:\n")
print(classification_report(y_true, y_pred, target_names=label_encoder.classes_))

# Confusion Matrix
print("\nConfusion Matrix:\n")
print(confusion_matrix(y_true, y_pred))

# Final Accuracy
acc = accuracy_score(y_true, y_pred) * 100
print(f"\nFinal Test Accuracy: {acc:.2f}%")


Classification Report:

                       precision    recall  f1-score   support

   Abnormal Heartbeat       0.92      0.82      0.87       606
        History of MI       0.79      0.71      0.75       447
Myocardial Infarction       0.97      1.00      0.98       621
               Normal       0.85      0.95      0.90       739

             accuracy                           0.89      2413
            macro avg       0.88      0.87      0.87      2413
         weighted avg       0.89      0.89      0.89      2413


Confusion Matrix:

[[499  54  17  36]
 [ 41 318   2  86]
 [  0   0 621   0]
 [  4  29   3 703]]

Final Test Accuracy: 88.73%


In [None]:
Extra Conv1D + MaxPooling before LSTM

In [50]:
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical
import joblib

In [52]:
# Folder paths
folders = {
    "Abnormal Heartbeat": r"D:\FYP\Cadivas CNN\preprocessed_1d\AHB",
    "Myocardial Infarction": r"D:\FYP\Cadivas CNN\preprocessed_1d\MI",
    "Normal": r"D:\FYP\Cadivas CNN\preprocessed_1d\NORMAL",
    "History of MI": r"D:\FYP\Cadivas CNN\preprocessed_1d\PM"
}

all_data = []
for label, folder in folders.items():
    for file in os.listdir(folder):
        if file.endswith(".csv"):
            df = pd.read_csv(os.path.join(folder, file))
            df['Class'] = label
            all_data.append(df)

data = pd.concat(all_data, ignore_index=True)

# Features & Labels
X = data.iloc[:, :255].values
y = data['Class'].values

In [53]:
# Label encoding
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_onehot = to_categorical(y_encoded)

# Save label encoder
joblib.dump(label_encoder, "label_encoder.pkl")

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y_onehot, test_size=0.2, random_state=42, stratify=y_onehot
)

# Reshape for 1D CNN
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

print("Train shape:", X_train.shape, "Test shape:", X_test.shape)

Train shape: (9651, 255, 1) Test shape: (2413, 255, 1)


In [54]:
class_weights = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(np.argmax(y_train, axis=1)),
    y=np.argmax(y_train, axis=1)
)
class_weights = dict(enumerate(class_weights))
print("Class Weights:", class_weights)

Class Weights: {0: 0.9957697069748246, 1: 1.3486584684181107, 2: 0.9705349959774738, 3: 0.8170504571622079}


In [55]:
model = Sequential()

# Conv1D layers
model.add(Conv1D(64, kernel_size=5, activation='relu', input_shape=(X_train.shape[1], 1)))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.2))

model.add(Conv1D(128, kernel_size=5, activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.2))

# LSTM layer
model.add(LSTM(128, return_sequences=False, dropout=0.2))

# Fully connected
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.3))
model.add(Dense(y_train.shape[1], activation='softmax'))

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_13 (Conv1D)          (None, 251, 64)           384       
                                                                 
 batch_normalization_9 (Batc  (None, 251, 64)          256       
 hNormalization)                                                 
                                                                 
 max_pooling1d_13 (MaxPoolin  (None, 125, 64)          0         
 g1D)                                                            
                                                                 
 dropout_10 (Dropout)        (None, 125, 64)           0         
                                                                 
 conv1d_14 (Conv1D)          (None, 121, 128)          41088     
                                                                 
 batch_normalization_10 (Bat  (None, 121, 128)        

In [57]:
early_stop = EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-6)

In [58]:
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=32,
    class_weight=class_weights,
    callbacks=[early_stop, reduce_lr]
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50


In [59]:
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

print("\nClassification Report:\n")
print(classification_report(y_true, y_pred_classes, target_names=label_encoder.classes_))

print("\nConfusion Matrix:\n")
print(confusion_matrix(y_true, y_pred_classes))

acc = accuracy_score(y_true, y_pred_classes) * 100
print(f"\nFinal Test Accuracy: {acc:.2f}%")


Classification Report:

                       precision    recall  f1-score   support

   Abnormal Heartbeat       0.93      0.80      0.86       606
        History of MI       0.74      0.84      0.78       447
Myocardial Infarction       0.95      1.00      0.97       621
               Normal       0.89      0.88      0.89       739

             accuracy                           0.88      2413
            macro avg       0.88      0.88      0.88      2413
         weighted avg       0.89      0.88      0.88      2413


Confusion Matrix:

[[485  68  20  33]
 [ 25 375   2  45]
 [  0   0 621   0]
 [ 12  67  11 649]]

Final Test Accuracy: 88.27%


In [None]:
CNN + Bidirectional LSTM + Attention

In [60]:
# Cell 1
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import joblib
import random

from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (Input, Conv1D, MaxPooling1D, BatchNormalization,
                                     Dropout, Bidirectional, LSTM, Dense, Flatten, Layer)
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical

# reproducibility (best-effort)
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)

In [61]:
# Cell 2
folders = {
    "Abnormal Heartbeat": r"D:\FYP\Cadivas CNN\preprocessed_1d\AHB",
    "Myocardial Infarction": r"D:\FYP\Cadivas CNN\preprocessed_1d\MI",
    "Normal": r"D:\FYP\Cadivas CNN\preprocessed_1d\NORMAL",
    "History of MI": r"D:\FYP\Cadivas CNN\preprocessed_1d\PM"
}

all_data = []
for label, folder in folders.items():
    for file in os.listdir(folder):
        if file.endswith(".csv"):
            df = pd.read_csv(os.path.join(folder, file))
            df['Class'] = label
            all_data.append(df)

data = pd.concat(all_data, ignore_index=True)
print("Loaded samples:", data.shape[0])

Loaded samples: 12064


In [62]:
# Cell 3
# features: first 255 columns (as before)
X = data.iloc[:, :255].values.astype(np.float32)
y = data['Class'].values

# Standardize features (fit on whole data or only train; we will fit on whole here for simplicity)
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Save scaler if you want later (optional)
joblib.dump(scaler, "scaler.pkl")

['scaler.pkl']

In [63]:
# Cell 4
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_onehot = to_categorical(y_encoded)

# Save encoder
joblib.dump(label_encoder, "label_encoder.pkl")
print("Classes:", label_encoder.classes_)

# Train/test split (stratify by original labels)
X_train, X_test, y_train, y_test, y_train_labels, y_test_labels = train_test_split(
    X, y_onehot, y_encoded, test_size=0.2, random_state=SEED, stratify=y_onehot
)

# reshape to (samples, timesteps, channels)
X_train = X_train.reshape((-1, X_train.shape[1], 1))
X_test  = X_test.reshape((-1, X_test.shape[1], 1))

print("Shapes:", X_train.shape, X_test.shape, y_train.shape)

Classes: ['Abnormal Heartbeat' 'History of MI' 'Myocardial Infarction' 'Normal']
Shapes: (9651, 255, 1) (2413, 255, 1) (9651, 4)


In [64]:
# Cell 5
def add_gaussian_noise(X, noise_factor=0.01):
    noise = np.random.normal(loc=0.0, scale=noise_factor, size=X.shape)
    return X + noise

def time_shift(X, max_shift=5):
    # shift along timestep axis by up to ±max_shift
    Xs = []
    for sample in X:
        shift = np.random.randint(-max_shift, max_shift+1)
        if shift == 0:
            Xs.append(sample)
        elif shift > 0:
            shifted = np.vstack([np.zeros((shift,1)), sample[:-shift]])
            Xs.append(shifted)
        else:
            shifted = np.vstack([sample[-shift:], np.zeros((-shift,1))])
            Xs.append(shifted)
    return np.array(Xs)

# create augmented training set by mixing original + noisy + shifted
X_train_noisy = add_gaussian_noise(X_train, noise_factor=0.02)
X_train_shift = time_shift(X_train, max_shift=6)

X_train_aug = np.concatenate([X_train, X_train_noisy, X_train_shift], axis=0)
y_train_aug = np.concatenate([y_train, y_train, y_train], axis=0)

print("Augmented train shape:", X_train_aug.shape, y_train_aug.shape)

Augmented train shape: (28953, 255, 1) (28953, 4)


In [65]:
orig_train_labels = np.argmax(y_train, axis=1)  # or use y_train_labels from earlier split
class_weights_array = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(orig_train_labels),
    y=orig_train_labels
)
class_weights = dict(enumerate(class_weights_array))
print("Class weights:", class_weights)

Class weights: {0: 0.9957697069748246, 1: 1.3486584684181107, 2: 0.9705349959774738, 3: 0.8170504571622079}


In [66]:
# Cell 7
# Simple attention layer: computes attention over time axis
class AttentionLayer(Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        # input_shape: (batch, time_steps, features)
        self.W = self.add_weight(name='att_weight', shape=(input_shape[-1],), initializer='random_normal', trainable=True)
        super(AttentionLayer, self).build(input_shape)

    def call(self, inputs):
        # inputs: (batch, time, features)
        # score: (batch, time)
        score = tf.tensordot(inputs, self.W, axes=[2, 0])  # dot over features -> (batch, time)
        weights = tf.nn.softmax(score, axis=1)             # (batch, time)
        weights_expanded = tf.expand_dims(weights, axis=-1) # (batch, time, 1)
        context = tf.reduce_sum(inputs * weights_expanded, axis=1) # (batch, features)
        return context

    def get_config(self):
        base_config = super(AttentionLayer, self).get_config()
        return {**base_config}

In [67]:
# Cell 8
timesteps = X_train.shape[1]
channels = X_train.shape[2]
num_classes = y_train.shape[1]

inputs = Input(shape=(timesteps, channels))

# Conv blocks for feature extraction
x = Conv1D(128, kernel_size=5, padding='same', activation='relu')(inputs)
x = BatchNormalization()(x)
x = MaxPooling1D(pool_size=2)(x)
x = Dropout(0.2)(x)

x = Conv1D(256, kernel_size=3, padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling1D(pool_size=2)(x)
x = Dropout(0.2)(x)

# Bidirectional LSTM to capture temporal patterns
x = Bidirectional(LSTM(128, return_sequences=True, dropout=0.2, recurrent_dropout=0.1))(x)

# Attention mechanism to aggregate time steps
context = AttentionLayer()(x)

# Dense head
x = Dense(128, activation='relu')(context)
x = Dropout(0.3)(x)
outputs = Dense(num_classes, activation='softmax')(x)

model = Model(inputs=inputs, outputs=outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_7 (InputLayer)        [(None, 255, 1)]          0         
                                                                 
 conv1d_15 (Conv1D)          (None, 255, 128)          768       
                                                                 
 batch_normalization_11 (Bat  (None, 255, 128)         512       
 chNormalization)                                                
                                                                 
 max_pooling1d_15 (MaxPoolin  (None, 127, 128)         0         
 g1D)                                                            
                                                                 
 dropout_13 (Dropout)        (None, 127, 128)          0         
                                                                 
 conv1d_16 (Conv1D)          (None, 127, 256)          98560 

In [None]:
# Cell 9
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-6, verbose=1)

history = model.fit(
    X_train_aug, y_train_aug,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=64,
    class_weight=class_weights,
    callbacks=[early_stop, reduce_lr],
    verbose=1
)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100

In [None]:
# Cell 10
# plot accuracy & loss
plt.figure(figsize=(12,4))
plt.subplot(1,2,1)
plt.plot(history.history['accuracy'], label='train_acc')
plt.plot(history.history['val_accuracy'], label='val_acc')
plt.title('Accuracy')
plt.legend()

plt.subplot(1,2,2)
plt.plot(history.history['loss'], label='train_loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.title('Loss')
plt.legend()
plt.tight_layout()
plt.show()

# Save history if desired
import json
with open("training_history.json", "w") as f:
    json.dump(history.history, f)

In [None]:
# Cell 11
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test, axis=1)

print("\nClassification Report:\n")
print(classification_report(y_true, y_pred, target_names=label_encoder.classes_))

cm = confusion_matrix(y_true, y_pred)
print("\nConfusion Matrix:\n", cm)

# Confusion matrix heatmap
plt.figure(figsize=(6,5))
plt.imshow(cm, interpolation='nearest', aspect='auto')
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.colorbar()
tick_marks = np.arange(len(label_encoder.classes_))
plt.xticks(tick_marks, label_encoder.classes_, rotation=45, ha='right')
plt.yticks(tick_marks, label_encoder.classes_)
# annotate cells
thresh = cm.max() / 2.
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, format(cm[i, j], 'd'),
                 ha="center", va="center",
                 color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.show()

acc = accuracy_score(y_true, y_pred) * 100
print(f"\nFinal Test Accuracy: {acc:.2f}%")

In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Input, Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization
)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import joblib

# -----------------------------
# Load & Combine Data
# -----------------------------
folders = {
    "Abnormal Heartbeat": r"E:\FYP\Cardi 2\Cardiovascular-Detection-using-ECG-images\preprocessed_1d\AHB",
    "Myocardial Infarction": r"E:\FYP\Cardi 2\Cardiovascular-Detection-using-ECG-images\preprocessed_1d\MI",
    "Normal": r"E:\FYP\Cardi 2\Cardiovascular-Detection-using-ECG-images\preprocessed_1d\NORMAL",
    "History of MI": r"E:\FYP\Cardi 2\Cardiovascular-Detection-using-ECG-images\preprocessed_1d\PM"
}

all_data = []
for label, folder in folders.items():
    for file in os.listdir(folder):
        if file.endswith(".csv"):
            df = pd.read_csv(os.path.join(folder, file))
            df['Class'] = label
            all_data.append(df)

data = pd.concat(all_data, ignore_index=True)

# Save combined CSV
save_path = r"E:\FYP\Cardi 2\Cardiovascular-Detection-using-ECG-images\combined_data.csv"
data.to_csv(save_path, index=False)
print(f"Combined CSV saved at: {save_path}")

# -----------------------------
# Prepare Data
# -----------------------------
X = data.iloc[:, :255].values
y = data['Class'].values

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_onehot = to_categorical(y_encoded)

X_train, X_test, y_train, y_test = train_test_split(
    X, y_onehot, test_size=0.2, random_state=42, stratify=y_onehot
)

X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

# -----------------------------
# Improved CNN Model
# -----------------------------
model = Sequential([
    Input(shape=(X_train.shape[1], 1)),

    Conv1D(64, kernel_size=7, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(128, kernel_size=5, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(256, kernel_size=3, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.4),
    Dense(128, activation='relu'),
    Dropout(0.3),
    Dense(y_onehot.shape[1], activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# -----------------------------
# Callbacks (better training)
# -----------------------------
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)

# -----------------------------
# Train Model
# -----------------------------
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=64,
    callbacks=[early_stop, reduce_lr],
    verbose=1
)

# -----------------------------
# Evaluate
# -----------------------------
loss, acc = model.evaluate(X_test, y_test, verbose=0)
print(f"✅ Test Accuracy: {acc*100:.2f}%")
