In [1]:
import os
import numpy as np
import pandas as pd
import scipy.io as sio
import matplotlib.pyplot as plt

from scipy.signal import stft
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from collections import defaultdict

import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras import layers, models


In [2]:
DATA_DIR = r'D:\ECG_model\Data'        # folder containing A0001.mat ...
CSV_PATH = r'D:\ECG_model\REFERENCE.csv'           # Recording, First_label
IMG_DIR = "ecg_images_multilead"

FS = 500                          # sampling frequency
FIXED_LEN = 7500                  # 15 sec
WINDOW = 1000                     # 2 sec
STRIDE = 500                      # 50% overlap

IMG_SIZE = (224, 224)
NUM_CLASSES = 9

LEADS = [0, 1, 2]   # Lead-1, Lead-2, Lead-3 (0-indexed)


In [3]:
labels_df = pd.read_csv(CSV_PATH)

label_map = {
    row["Recording"]: row["First_label"] - 1   # map 1–9 → 0–8
    for _, row in labels_df.iterrows()
}


In [4]:
def fix_length(sig, length=7500):
    if sig.shape[0] >= length:
        return sig[:length]
    else:
        return np.pad(sig, (0, length - sig.shape[0]))


In [5]:
def tf_image(signal):
    f, t, Zxx = stft(signal, fs=FS, nperseg=128)
    img = np.abs(Zxx)
    img = (img - img.min()) / (img.max() - img.min() + 1e-8)
    return img


In [6]:
os.makedirs(IMG_DIR, exist_ok=True)

for cls in range(NUM_CLASSES):
    os.makedirs(os.path.join(IMG_DIR, f"class_{cls+1}"), exist_ok=True)


In [7]:
for fname in os.listdir(DATA_DIR):

    rec_id = fname.replace(".mat", "")
    if rec_id not in label_map:
        continue

    label = label_map[rec_id]

    mat = sio.loadmat(os.path.join(DATA_DIR, fname))
    ecg = mat["ECG"][0][0]["data"]     # shape (12, N)

    # select leads
    leads = [fix_length(ecg[l]) for l in LEADS]

    # windowing
    win_id = 0
    for start in range(0, FIXED_LEN - WINDOW + 1, STRIDE):

        channels = []
        for sig in leads:
            w = sig[start:start + WINDOW]
            img = tf_image(w)
            channels.append(img)

        # stack → RGB
        rgb = np.stack(channels, axis=-1)

        # resize
        rgb = tf.image.resize(rgb, IMG_SIZE).numpy()

        # save
        save_path = os.path.join(
            IMG_DIR,
            f"class_{label+1}",
            f"{rec_id}_win{win_id}.png"
        )

        plt.imsave(save_path, rgb)
        win_id += 1


In [8]:
train_dir = "ecg_train_multilead"
val_dir   = "ecg_val_multilead"
test_dir  = "ecg_test_multilead"

for d in [train_dir, val_dir, test_dir]:
    for c in range(NUM_CLASSES):
        os.makedirs(os.path.join(d, f"class_{c+1}"), exist_ok=True)


In [9]:
ecg_groups = defaultdict(list)

for cls in range(NUM_CLASSES):
    class_dir = os.path.join(IMG_DIR, f"class_{cls+1}")
    for f in os.listdir(class_dir):
        ecg_id = f.split("_win")[0]
        ecg_groups[(ecg_id, cls)].append(f)


In [10]:
ecgs = list(ecg_groups.keys())
train_ecg, temp = train_test_split(ecgs, test_size=0.3, random_state=42)
val_ecg, test_ecg = train_test_split(temp, test_size=0.5, random_state=42)


In [11]:
def copy_ecgs(ecg_list, target_dir):
    for ecg_id, cls in ecg_list:
        for img in ecg_groups[(ecg_id, cls)]:
            src = os.path.join(IMG_DIR, f"class_{cls+1}", img)
            dst = os.path.join(target_dir, f"class_{cls+1}", img)
            tf.io.gfile.copy(src, dst, overwrite=True)

copy_ecgs(train_ecg, train_dir)
copy_ecgs(val_ecg, val_dir)
copy_ecgs(test_ecg, test_dir)


### Load Dataset

In [12]:
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    train_dir,
    image_size=IMG_SIZE,
    batch_size=32
)

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    val_dir,
    image_size=IMG_SIZE,
    batch_size=32
)

test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    test_dir,
    image_size=IMG_SIZE,
    batch_size=32,
    shuffle=False
)


Found 34734 files belonging to 9 classes.
Found 7448 files belonging to 9 classes.
Found 7448 files belonging to 9 classes.


In [13]:
base_model = EfficientNetB0(
    include_top=False,
    weights="imagenet",
    input_shape=(224, 224, 3)
)

base_model.trainable = True
for layer in base_model.layers[:200]:
    layer.trainable = False


In [14]:
inputs = layers.Input(shape=(224,224,3))
x = tf.keras.applications.efficientnet.preprocess_input(inputs)
x = base_model(x)
x = layers.GlobalAveragePooling2D()(x)
x = layers.BatchNormalization()(x)
outputs = layers.Dense(NUM_CLASSES, activation="softmax")(x)

model = models.Model(inputs, outputs)


In [15]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-5),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)


In [16]:
model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=30,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor="val_loss",
            patience=5,
            restore_best_weights=True
        )
    ]
)


Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30


<keras.callbacks.History at 0x27b67750550>

In [17]:
y_true = []
y_pred = []
file_paths = test_ds.file_paths

for imgs, labels in test_ds:
    preds = model.predict(imgs, verbose=0)
    y_pred.extend(np.argmax(preds, axis=1))
    y_true.extend(labels.numpy())


In [18]:
ecg_preds = defaultdict(list)
ecg_true = {}

for path, p, t in zip(file_paths, y_pred, y_true):
    ecg_id = os.path.basename(path).split("_win")[0]
    ecg_preds[ecg_id].append(p)
    ecg_true[ecg_id] = t


In [19]:
final_pred = []
final_true = []

for ecg_id in ecg_preds:
    pred = max(set(ecg_preds[ecg_id]), key=ecg_preds[ecg_id].count)
    final_pred.append(pred)
    final_true.append(ecg_true[ecg_id])


In [20]:
print("ECG-Level Accuracy:", accuracy_score(final_true, final_pred))

print("\nECG-Level Classification Report:\n")
print(classification_report(
    final_true,
    final_pred,
    labels=list(range(9)),
    digits=2
))


ECG-Level Accuracy: 0.5112781954887218

ECG-Level Classification Report:

              precision    recall  f1-score   support

           0       0.62      0.43      0.51        91
           1       0.63      0.75      0.69        80
           2       0.40      0.44      0.42        48
           3       0.60      0.64      0.62        14
           4       0.48      0.83      0.61       122
           5       0.20      0.04      0.06        52
           6       0.57      0.16      0.25        51
           7       0.46      0.52      0.49        60
           8       0.14      0.07      0.10        14

    accuracy                           0.51       532
   macro avg       0.46      0.43      0.42       532
weighted avg       0.49      0.51      0.47       532



In [21]:
import os
import numpy as np
from collections import defaultdict
from sklearn.metrics import classification_report, accuracy_score

# Collect per-window probabilities
probs = []
y_true = []
file_paths = test_ds.file_paths

for images, labels in test_ds:
    p = model.predict(images, verbose=0)  # shape (B, 9)
    probs.extend(p)
    y_true.extend(labels.numpy())

# Group by ECG id
ecg_probs = defaultdict(list)
ecg_true = {}

for path, p, t in zip(file_paths, probs, y_true):
    ecg_id = os.path.basename(path).split("_win")[0]
    ecg_probs[ecg_id].append(p)
    ecg_true[ecg_id] = t

# Average probabilities per ECG
final_pred = []
final_true = []

for ecg_id, plist in ecg_probs.items():
    avg_p = np.mean(plist, axis=0)
    final_pred.append(np.argmax(avg_p))
    final_true.append(ecg_true[ecg_id])

print("ECG-Level Accuracy:", accuracy_score(final_true, final_pred))
print("\nECG-Level Classification Report:\n")
print(classification_report(final_true, final_pred, labels=list(range(9)), digits=2))


ECG-Level Accuracy: 0.5338345864661654

ECG-Level Classification Report:

              precision    recall  f1-score   support

           0       0.67      0.48      0.56        91
           1       0.62      0.74      0.67        80
           2       0.43      0.48      0.46        48
           3       0.59      0.71      0.65        14
           4       0.53      0.80      0.64       122
           5       0.33      0.08      0.12        52
           6       0.65      0.22      0.32        51
           7       0.41      0.57      0.48        60
           8       0.17      0.07      0.10        14

    accuracy                           0.53       532
   macro avg       0.49      0.46      0.44       532
weighted avg       0.53      0.53      0.50       532



### Merging week class -> abnormal

In [42]:
BASE_DIR = r'D:\ECG_model\multiple lead\ecg_images_multilead'   # FULL multi-lead image folder

IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 35

# original labels: 0–8 (from class_1 → class_9)
# merge: class 5 + class 8 → new class 5
NUM_CLASSES = 8


In [43]:
image_paths = []
labels = []

for cls in range(1, 10):  # class_1 → class_9
    class_dir = os.path.join(BASE_DIR, f"class_{cls}")
    for fname in os.listdir(class_dir):
        image_paths.append(os.path.join(class_dir, fname))
        labels.append(cls - 1)   # map 1–9 → 0–8

image_paths = np.array(image_paths)
labels = np.array(labels)

print("Total window images:", len(image_paths))


Total window images: 49630


In [44]:
def remap_label(l):
    if l == 8:
        return 5
    return l

labels = np.array([remap_label(l) for l in labels])


In [45]:
ecg_to_indices = defaultdict(list)

for i, path in enumerate(image_paths):
    ecg_id = os.path.basename(path).split("_win")[0]
    ecg_to_indices[ecg_id].append(i)

ecg_ids = list(ecg_to_indices.keys())

train_ecg, temp_ecg = train_test_split(ecg_ids, test_size=0.3, random_state=42)
val_ecg, test_ecg = train_test_split(temp_ecg, test_size=0.5, random_state=42)


In [46]:
def collect_indices(ecg_list):
    idx = []
    for e in ecg_list:
        idx.extend(ecg_to_indices[e])
    return idx

train_idx = collect_indices(train_ecg)
val_idx   = collect_indices(val_ecg)
test_idx  = collect_indices(test_ecg)

X_train, y_train = image_paths[train_idx], labels[train_idx]
X_val,   y_val   = image_paths[val_idx],   labels[val_idx]
X_test,  y_test  = image_paths[test_idx],  labels[test_idx]


In [47]:
def load_image(path, label):
    img = tf.io.read_file(path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    img = tf.keras.applications.efficientnet.preprocess_input(img)
    return img, label

train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_ds = train_ds.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val))
val_ds = val_ds.map(load_image).batch(BATCH_SIZE)

test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_ds = test_ds.map(load_image).batch(BATCH_SIZE)


In [48]:
base_model = EfficientNetB0(
    include_top=False,
    weights="imagenet",
    input_shape=(224, 224, 3)
)

# fine-tuning
base_model.trainable = True
for layer in base_model.layers[:200]:
    layer.trainable = False


In [49]:
inputs = layers.Input(shape=(224, 224, 3))
x = base_model(inputs, training=True)
x = layers.GlobalAveragePooling2D()(x)
x = layers.BatchNormalization()(x)
outputs = layers.Dense(NUM_CLASSES, activation="softmax")(x)

model = models.Model(inputs, outputs)


In [50]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-5),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor="val_loss",
            patience=6,
            restore_best_weights=True
        )
    ]
)


Epoch 1/35
Epoch 2/35
Epoch 3/35
Epoch 4/35
Epoch 5/35
Epoch 6/35
Epoch 7/35
Epoch 8/35
Epoch 9/35
Epoch 10/35
Epoch 11/35
Epoch 12/35
Epoch 13/35
Epoch 14/35
Epoch 15/35
Epoch 16/35
Epoch 17/35
Epoch 18/35
Epoch 19/35
Epoch 20/35
Epoch 21/35


<keras.callbacks.History at 0x27d0c02a760>

In [51]:
y_pred = []
y_true = []

for imgs, labs in test_ds:
    preds = model.predict(imgs, verbose=0)
    y_pred.extend(np.argmax(preds, axis=1))
    y_true.extend(labs.numpy())


In [52]:
ecg_preds = defaultdict(list)
ecg_true = {}

for path, p, t in zip(X_test, y_pred, y_true):
    ecg_id = os.path.basename(path).split("_win")[0]
    ecg_preds[ecg_id].append(p)
    ecg_true[ecg_id] = t

final_pred = []
final_true = []

for ecg_id in ecg_preds:
    pred = max(set(ecg_preds[ecg_id]), key=ecg_preds[ecg_id].count)
    final_pred.append(pred)
    final_true.append(ecg_true[ecg_id])


In [53]:
print("ECG-Level Accuracy:",
      accuracy_score(final_true, final_pred))

print("\nECG-Level Classification Report:\n")

print(classification_report(
    final_true,
    final_pred,
    labels=list(range(NUM_CLASSES)),
    digits=2
))


ECG-Level Accuracy: 0.3966165413533835

ECG-Level Classification Report:

              precision    recall  f1-score   support

           0       0.49      0.36      0.42        91
           1       0.54      0.55      0.54        80
           2       0.35      0.25      0.29        48
           3       1.00      0.14      0.25        14
           4       0.35      0.80      0.48       122
           5       0.22      0.03      0.05        66
           6       0.47      0.14      0.21        51
           7       0.33      0.23      0.27        60

    accuracy                           0.40       532
   macro avg       0.47      0.31      0.32       532
weighted avg       0.41      0.40      0.35       532

