<a href="https://colab.research.google.com/github/Messycodess/Arrhythmia-Detection-and-Explainable-AI/blob/main/Models11.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 4 Models (Simple CNN, Resnet, Inception Time, Xception )
# MIT-BIH dataset .dat/.hea/.atr files are in:
# /content/drive/MyDrive/ECG_Datasets/MIT-BIH
# Results saved to:
# /content/drive/MyDrive/ECG_Datasets/Results/<model_name>/

# ===============================
# STEP 0: GPU & Misc
# ===============================
import os
import random
import numpy as np
import torch

# reproducibility (best effort)
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# ===============================
# STEP 1: Mount Google Drive
# ===============================
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

DATA_ROOT = "/content/drive/MyDrive/ECG_Datasets"
MITBIH_PATH = os.path.join(DATA_ROOT, "MIT-BIH")
RESULTS_ROOT = os.path.join(DATA_ROOT, "Results")
os.makedirs(RESULTS_ROOT, exist_ok=True)
print("Data path:", MITBIH_PATH)
print("Results path:", RESULTS_ROOT)

# ===============================
# STEP 2: Install & Import Libraries
# ===============================
!pip install -q wfdb
import wfdb
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from tqdm import tqdm

import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

# ===============================
# STEP 3: AAMI mapping (5 classes)
# ===============================
AAMI_CLASSES = {
    'N': ['N','L','R','e','j'],      # Normal
    'S': ['A','a','J','S'],          # Supraventricular ectopic
    'V': ['V','E'],                  # Ventricular ectopic
    'F': ['F'],                      # Fusion
    'Q': ['/', 'f','Q']              # Unknown / paced
}
classes = list(AAMI_CLASSES.keys())

symbol_to_class = {}
for idx, key in enumerate(AAMI_CLASSES.keys()):
    for sym in AAMI_CLASSES[key]:
        symbol_to_class[sym] = idx

# ===============================
# STEP 4: Load & Preprocess MIT-BIH
# ===============================
records = [f.split('.')[0] for f in os.listdir(MITBIH_PATH) if f.endswith('.dat')]
records = sorted(records)
print("Total records found:", len(records))

signals = []
labels = []
win_size = 360  # 1 second at 360Hz

for rec in tqdm(records, desc="Reading records"):
    rec_path = os.path.join(MITBIH_PATH, rec)
    try:
        record = wfdb.rdrecord(rec_path)
        annotation = wfdb.rdann(rec_path, 'atr')
    except Exception as e:
        print("Failed to read", rec, e)
        continue

    # use first channel (Lead II) if available
    signal = record.p_signal[:, 0]
    # z-score normalize per record
    signal = (signal - np.mean(signal)) / (np.std(signal) + 1e-9)

    for idx, beat_sample in enumerate(annotation.sample):
        start = int(max(0, beat_sample - win_size // 2))
        end = start + win_size
        if end > len(signal):
            continue
        seg = signal[start:end]
        signals.append(seg)

        sym = annotation.symbol[idx]
        class_idx = symbol_to_class.get(sym, len(AAMI_CLASSES)-1)  # default Q
        labels.append(class_idx)

signals = np.array(signals, dtype=np.float32)
labels = np.array(labels, dtype=np.int64)
print("Signals shape:", signals.shape, "Labels shape:", labels.shape)

# ===============================
# STEP 5: Train/Val/Test split (we'll do train/test; can split val from train)
# ===============================
X_train, X_test, y_train, y_test = train_test_split(
    signals, labels, test_size=0.2, random_state=seed, stratify=labels
)

# Use a small validation split from train for monitoring (10% of train)
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.1, random_state=seed, stratify=y_train
)

print("Train/Val/Test sizes:", X_train.shape[0], X_val.shape[0], X_test.shape[0])

# ===============================
# STEP 6: PyTorch Dataset & DataLoader with Weighted Sampler
# ===============================
class ECGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32).unsqueeze(1)  # (N,1,L)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = ECGDataset(X_train, y_train)
val_dataset = ECGDataset(X_val, y_val)
test_dataset = ECGDataset(X_test, y_test)

# Weighted sampler to address imbalance
class_sample_counts = np.bincount(y_train)
class_weights = 1. / (class_sample_counts + 1e-9)
samples_weights = class_weights[y_train]
sampler = WeightedRandomSampler(weights=samples_weights, num_samples=len(samples_weights), replacement=True)

BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, drop_last=False, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

# ===============================
# STEP 7: Define Models
# - CNN1D (baseline kept)
# - ResNet18-1D
# - InceptionTime (standard)
# - Mini Xception-1D (depthwise separable)
# ===============================

# ---- CNN1D (baseline) ----
class CNN1D(nn.Module):
    def __init__(self, input_length, num_classes=5):
        super(CNN1D, self).__init__()
        self.conv1 = nn.Conv1d(1, 32, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(32)
        self.pool1 = nn.MaxPool1d(2)

        self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(64)
        self.pool2 = nn.MaxPool1d(2)

        # compute linear size
        x = torch.randn(1,1,input_length)
        x = self.pool1(torch.relu(self.bn1(self.conv1(x))))
        x = self.pool2(torch.relu(self.bn2(self.conv2(x))))
        flat = x.numel()
        self.fc1 = nn.Linear(flat, 128)
        self.drop = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool1(torch.relu(self.bn1(self.conv1(x))))
        x = self.pool2(torch.relu(self.bn2(self.conv2(x))))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.drop(x)
        x = self.fc2(x)
        return x

# ---- ResNet1D (ResNet18-like) ----
def conv1d_bn_relu(in_ch, out_ch, kernel_size, stride=1, padding=0):
    return nn.Sequential(
        nn.Conv1d(in_ch, out_ch, kernel_size, stride=stride, padding=padding, bias=False),
        nn.BatchNorm1d(out_ch),
        nn.ReLU(inplace=True)
    )

class BasicBlock1D(nn.Module):
    expansion = 1
    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(BasicBlock1D, self).__init__()
        self.conv1 = nn.Conv1d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv1d(planes, planes, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(planes)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out

class ResNet1D(nn.Module):
    def __init__(self, block, layers, input_length, num_classes=5):
        super(ResNet1D, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

        # layers
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        # global pooling -> fc
        # compute flatten size dynamically
        x = torch.randn(1,1,input_length)
        x = self.maxpool(torch.relu(self.bn1(self.conv1(x))))
        x = self.layer1(x); x = self.layer2(x); x = self.layer3(x); x = self.layer4(x)
        x = nn.functional.adaptive_avg_pool1d(x, 1)
        flat = x.view(x.size(0), -1).shape[1]
        self.fc = nn.Linear(flat, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(planes * block.expansion),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.maxpool(torch.relu(self.bn1(self.conv1(x))))
        x = self.layer1(x); x = self.layer2(x); x = self.layer3(x); x = self.layer4(x)
        x = nn.functional.adaptive_avg_pool1d(x, 1)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

def ResNet18_1D(input_length, num_classes=5):
    return ResNet1D(BasicBlock1D, [2,2,2,2], input_length, num_classes)

# ---- InceptionTime building blocks (standard InceptionTime) ----
class InceptionModule(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionModule, self).__init__()
        # following InceptionTime: bottleneck + conv kernels 39, 19, 9 (common choices)
        bottleneck_channels = max(1, in_channels // 4)
        self.bottleneck = nn.Conv1d(in_channels, bottleneck_channels, kernel_size=1, bias=False)
        self.conv1 = nn.Conv1d(bottleneck_channels, out_channels, kernel_size=39, padding=19, bias=False)
        self.conv2 = nn.Conv1d(bottleneck_channels, out_channels, kernel_size=19, padding=9, bias=False)
        self.conv3 = nn.Conv1d(bottleneck_channels, out_channels, kernel_size=9, padding=4, bias=False)
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool1d(kernel_size=3, stride=1, padding=1),
            nn.Conv1d(in_channels, out_channels, kernel_size=1, bias=False)
        )
        self.bn = nn.BatchNorm1d(out_channels * 4)
        self.relu = nn.ReLU()

    def forward(self, x):
        if x.shape[1] == 1:
            bx = self.bottleneck(x)
        else:
            bx = self.bottleneck(x)
        y1 = self.conv1(bx)
        y2 = self.conv2(bx)
        y3 = self.conv3(bx)
        y4 = self.maxpool_conv(x)
        y = torch.cat([y1, y2, y3, y4], dim=1)
        y = self.bn(y)
        y = self.relu(y)
        return y

class InceptionBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionBlock, self).__init__()
        self.incep = InceptionModule(in_channels, out_channels)
        self.residual = None
        if in_channels != out_channels*4:
            self.residual = nn.Sequential(
                nn.Conv1d(in_channels, out_channels*4, kernel_size=1, bias=False),
                nn.BatchNorm1d(out_channels*4)
            )
        self.relu = nn.ReLU()

    def forward(self, x):
        y = self.incep(x)
        if self.residual is not None:
            x = self.residual(x)
        return self.relu(x + y)

class InceptionTime(nn.Module):
    def __init__(self, input_length, num_classes=5, num_modules=6, in_channels=1, out_channels=32):
        super(InceptionTime, self).__init__()
        channels = in_channels
        modules = []
        for _ in range(num_modules):
            modules.append(InceptionBlock(channels, out_channels))
            channels = out_channels * 4
        self.network = nn.Sequential(*modules)
        # global pooling and fc
        x = torch.randn(1,1,input_length)
        x = self.network(x)
        x = nn.functional.adaptive_avg_pool1d(x, 1)
        flat = x.view(x.size(0), -1).shape[1]
        self.fc = nn.Linear(flat, num_classes)

    def forward(self, x):
        x = self.network(x)
        x = nn.functional.adaptive_avg_pool1d(x, 1)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# ---- Mini Xception 1D (depthwise separable) ----
class SeparableConv1d(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size=3, padding=1):
        super(SeparableConv1d, self).__init__()
        self.depthwise = nn.Conv1d(in_ch, in_ch, kernel_size=kernel_size, padding=padding, groups=in_ch, bias=False)
        self.pointwise = nn.Conv1d(in_ch, out_ch, kernel_size=1, bias=False)
        self.bn = nn.BatchNorm1d(out_ch)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        x = self.bn(x)
        return self.relu(x)

class MiniXception1D(nn.Module):
    def __init__(self, input_length, num_classes=5):
        super(MiniXception1D, self).__init__()
        self.block1 = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm1d(32),
            nn.ReLU()
        )
        self.sep1 = SeparableConv1d(32, 64, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(2)
        self.sep2 = SeparableConv1d(64, 128, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(2)
        self.sep3 = SeparableConv1d(128, 256, kernel_size=3, padding=1)
        self.pool3 = nn.AdaptiveAvgPool1d(1)
        x = torch.randn(1,1,input_length)
        x = self.block1(x)
        x = self.sep1(x); x = self.pool1(x)
        x = self.sep2(x); x = self.pool2(x)
        x = self.sep3(x); x = self.pool3(x)
        flat = x.view(x.size(0), -1).shape[1]
        self.fc = nn.Linear(flat, num_classes)

    def forward(self, x):
        x = self.block1(x)
        x = self.sep1(x); x = self.pool1(x)
        x = self.sep2(x); x = self.pool2(x)
        x = self.sep3(x); x = self.pool3(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# ===============================
# STEP 8: Utilities (train/validate/evaluate + save)
# ===============================
import time
from pathlib import Path
import json

def train_one_model(model, model_name, num_epochs=20, lr=1e-3):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

    best_val_acc = 0.0
    best_state = None

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_correct = 0
        running_total = 0
        for X, y in train_loader:
            X = X.to(device); y = y.to(device)
            optimizer.zero_grad()
            outputs = model(X)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * X.size(0)
            _, preds = torch.max(outputs, 1)
            running_correct += (preds == y).sum().item()
            running_total += X.size(0)

        train_loss = running_loss / running_total
        train_acc = running_correct / running_total

        # validation
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for X, y in val_loader:
                X = X.to(device); y = y.to(device)
                outputs = model(X)
                loss = criterion(outputs, y)
                val_loss += loss.item() * X.size(0)
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == y).sum().item()
                val_total += X.size(0)

        val_loss = val_loss / val_total
        val_acc = val_correct / val_total

        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)

        print(f"[{model_name}] Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f} Acc: {train_acc*100:.2f}% | Val Loss: {val_loss:.4f} Acc: {val_acc*100:.2f}%")

        # save best
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_state = { "model": model.state_dict(), "optimizer": optimizer.state_dict(), "epoch": epoch }

    # save final & best
    model_dir = os.path.join(RESULTS_ROOT, model_name)
    Path(model_dir).mkdir(parents=True, exist_ok=True)

    # final model
    final_path = os.path.join(model_dir, f"{model_name}_final.pt")
    torch.save(model.state_dict(), final_path)

    # best checkpoint
    if best_state is not None:
        best_path = os.path.join(model_dir, f"{model_name}_best.pt")
        torch.save(best_state["model"], best_path)

    # save history
    hist_path = os.path.join(model_dir, f"{model_name}_history.npy")
    np.save(hist_path, history)

    return model, history, model_dir

def evaluate_and_save(model, model_name, model_dir):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for X, y in test_loader:
            X = X.to(device); y = y.to(device)
            outputs = model(X)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)

    # confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(6,5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.xlabel('Predicted'); plt.ylabel('True')
    plt.title(f"{model_name} Confusion Matrix")
    cm_path = os.path.join(model_dir, f"{model_name}_confusion_matrix.png")
    plt.savefig(cm_path, bbox_inches='tight', dpi=150)
    plt.close()

    # classification report
    report = classification_report(all_labels, all_preds, target_names=classes, digits=4)
    print(f"=== {model_name} Classification Report ===\n", report)
    report_path = os.path.join(model_dir, f"{model_name}_classification_report.txt")
    with open(report_path, "w") as f:
        f.write(report)

    # per-class accuracy
    per_class_acc = cm.diagonal() / (cm.sum(axis=1) + 1e-9)
    stats = {
        "confusion_matrix": cm.tolist(),
        "per_class_accuracy": per_class_acc.tolist()
    }
    stats_path = os.path.join(model_dir, f"{model_name}_stats.json")
    with open(stats_path, "w") as f:
        json.dump(stats, f, indent=2)

    return all_labels, all_preds, cm, report

def plot_history(history, model_name, model_dir):
    plt.figure(figsize=(12,4))
    plt.subplot(1,2,1)
    plt.plot(history["train_acc"], label="Train Acc")
    plt.plot(history["val_acc"], label="Val Acc")
    plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title(f"{model_name} Accuracy")
    plt.legend()
    plt.subplot(1,2,2)
    plt.plot(history["train_loss"], label="Train Loss")
    plt.plot(history["val_loss"], label="Val Loss")
    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title(f"{model_name} Loss")
    plt.legend()
    plot_path = os.path.join(model_dir, f"{model_name}_acc_loss.png")
    plt.savefig(plot_path, bbox_inches='tight', dpi=150)
    plt.close()

# ===============================
# STEP 9: Instantiate models and run training/eval for each
# ===============================
input_length = X_train.shape[1]
models_to_run = {
    "CNN1D": CNN1D(input_length=input_length, num_classes=len(classes)),
    "ResNet18_1D": ResNet18_1D(input_length=input_length, num_classes=len(classes)),
    "InceptionTime": InceptionTime(input_length=input_length, num_classes=len(classes), num_modules=6, out_channels=32),
    "MiniXception1D": MiniXception1D(input_length=input_length, num_classes=len(classes))
}

# training parameters
NUM_EPOCHS = 20
LR = 1e-3

# Run models sequentially (memory friendly)
results_summary = {}
for name, model in models_to_run.items():
    print("\n" + "="*60)
    print("Training model:", name)
    print("="*60)
    model, history, model_dir = train_one_model(model, name, num_epochs=NUM_EPOCHS, lr=LR)
    plot_history(history, name, model_dir)
    all_labels, all_preds, cm, report = evaluate_and_save(model, name, model_dir)
    # save preds & labels
    np.save(os.path.join(model_dir, f"{name}_y_true.npy"), all_labels)
    np.save(os.path.join(model_dir, f"{name}_y_pred.npy"), all_preds)
    results_summary[name] = {"model_dir": model_dir, "best_val_acc": max(history["val_acc"]) if len(history["val_acc"])>0 else None}

# Save a summary file
with open(os.path.join(RESULTS_ROOT, "results_summary.json"), "w") as f:
    json.dump(results_summary, f, indent=2)

print("All models finished. Results saved to:", RESULTS_ROOT)


Device: cuda
Mounted at /content/drive
Data path: /content/drive/MyDrive/ECG_Datasets/MIT-BIH
Results path: /content/drive/MyDrive/ECG_Datasets/Results
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m139.7 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-colab 1.0.0 requires pandas==2.2.2, but you have pandas 2.3.3 which is incompatible.[0m[31m
[0mTotal records found: 48


Reading records: 100%|██████████| 48/48 [00:10<00:00,  4.52it/s]


Signals shape: (112619, 360) Labels shape: (112619,)
Train/Val/Test sizes: 81085 9010 22524

Training model: CNN1D
[CNN1D] Epoch 1/20 - Train Loss: 0.2865 Acc: 90.01% | Val Loss: 0.1515 Acc: 95.53%
[CNN1D] Epoch 2/20 - Train Loss: 0.1483 Acc: 94.81% | Val Loss: 0.1622 Acc: 94.88%
[CNN1D] Epoch 3/20 - Train Loss: 0.1109 Acc: 96.04% | Val Loss: 0.1385 Acc: 95.74%
[CNN1D] Epoch 4/20 - Train Loss: 0.0879 Acc: 97.01% | Val Loss: 0.0876 Acc: 97.58%
[CNN1D] Epoch 5/20 - Train Loss: 0.0782 Acc: 97.30% | Val Loss: 0.0748 Acc: 97.85%
[CNN1D] Epoch 6/20 - Train Loss: 0.0643 Acc: 97.77% | Val Loss: 0.0823 Acc: 97.86%
[CNN1D] Epoch 7/20 - Train Loss: 0.0563 Acc: 98.05% | Val Loss: 0.0804 Acc: 97.86%
[CNN1D] Epoch 8/20 - Train Loss: 0.0501 Acc: 98.33% | Val Loss: 0.0767 Acc: 98.15%
[CNN1D] Epoch 9/20 - Train Loss: 0.0463 Acc: 98.48% | Val Loss: 0.0698 Acc: 98.46%
[CNN1D] Epoch 10/20 - Train Loss: 0.0434 Acc: 98.57% | Val Loss: 0.0698 Acc: 98.48%
[CNN1D] Epoch 11/20 - Train Loss: 0.0395 Acc: 98.70% |

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Grad CAM




In [None]:
# ======= Fixed Grad-CAM (no backward-hook conflict) =======
import torch, torch.nn.functional as F, numpy as np, matplotlib.pyplot as plt, random
from pathlib import Path
from IPython.display import display
from PIL import Image

MODEL_NAME = "ResNet18_1D"
RESULTS_ROOT = "/content/drive/MyDrive/ECG_Datasets/Results"
SAVE_DIR = Path(RESULTS_ROOT) / MODEL_NAME

model = globals().get('model')
test_dataset = globals().get('test_dataset')
device = globals().get('device')
classes = globals().get('classes', ['N','S','V','F','Q'])

if model is None or test_dataset is None:
    raise RuntimeError("Model or test_dataset missing. Run the restore cell first.")

model = model.to(device)
model.eval()

# helper: find last Conv1d layer and its name
def find_last_conv1d(mod):
    last = None
    last_name = None
    for name, m in mod.named_modules():
        if isinstance(m, torch.nn.Conv1d):
            last = m
            last_name = name
    return last, last_name

target_layer, layer_name = find_last_conv1d(model)
if target_layer is None:
    raise RuntimeError("No Conv1d layer found in model for Grad-CAM.")

print("Using target layer:", layer_name)

# Grad-CAM using torch.autograd.grad (no backward hooks)
def gradcam_1d_nohook(model, input_tensor, target_class, target_layer):
    """
    input_tensor: (1,1,L) tensor on device
    returns: cam_norm (L,) and predicted probability for target_class
    """
    activations = []

    # forward hook to capture activations (DO NOT detach)
    def forward_hook(m, inp, out):
        activations.append(out)   # keep tensor requiring grad

    fh = target_layer.register_forward_hook(forward_hook)

    out = model(input_tensor)          # forward pass
    score = out[0, target_class]       # scalar tensor

    # compute gradients of score w.r.t. activations[0]
    if len(activations) == 0:
        fh.remove()
        raise RuntimeError("Activation not captured by forward hook.")
    act = activations[0]               # shape (1, C, Lf), requires_grad=True
    grads = torch.autograd.grad(outputs=score, inputs=act, retain_graph=False)[0]  # (1,C,Lf)

    fh.remove()

    # channel weights: global average pooling over time dim
    weights = grads.mean(dim=2, keepdim=True)   # (1,C,1)
    # weighted sum
    cam = F.relu((weights * act).sum(dim=1, keepdim=True))  # (1,1,Lf)
    # upsample to input length
    cam_up = F.interpolate(cam, size=input_tensor.shape[-1], mode='linear', align_corners=False)
    cam_np = cam_up.squeeze().detach().cpu().numpy()
    cam_norm = (cam_np - cam_np.min()) / (cam_np.max() - cam_np.min() + 1e-9)

    prob = float(torch.softmax(out.detach(), dim=1)[0, target_class].cpu().numpy())
    return cam_norm, prob

# pick representatives (light scan)
MAX_SCAN = 2000
n_test = len(test_dataset)
idxs = list(range(n_test))
if n_test > MAX_SCAN:
    idxs = random.sample(idxs, MAX_SCAN)

softmax = torch.nn.Softmax(dim=1)
rep = {i: None for i in range(len(classes))}

# prefer high-confidence correct predictions
with torch.no_grad():
    for i in idxs:
        x, y = test_dataset[i]
        inp = x.unsqueeze(0).to(device)
        out = model(inp)
        probs = softmax(out).cpu().numpy()[0]
        pred = int(np.argmax(probs)); true = int(y.item())
        if pred == true and rep[true] is None:
            rep[true] = i
        if all(v is not None for v in rep.values()):
            break

# fallback: any sample of that class
if not all(v is not None for v in rep.values()):
    for i in idxs:
        _, y = test_dataset[i]
        cls = int(y.item())
        if rep[cls] is None:
            rep[cls] = i
        if all(v is not None for v in rep.values()):
            break

print("Representative examples (class:index):", rep)

# create combined figure
fig, axes = plt.subplots(len(classes), 1, figsize=(10, 1.6*len(classes)))
for cls_idx, cls_name in enumerate(classes):
    idx = rep[cls_idx]
    if idx is None:
        axes[cls_idx].text(0.5, 0.5, f"No sample for {cls_name}", ha='center')
        continue
    sample, y = test_dataset[idx]
    x = sample.squeeze().cpu().numpy()
    inp = sample.unsqueeze(0).to(device)

    # get model prediction
    with torch.no_grad():
        out = model(inp)
        pred = int(out.argmax(1).cpu().numpy()[0])

    # compute cam using gradcam function (no hooks conflict)
    cam, prob = gradcam_1d_nohook(model, inp, pred, target_layer)

    t = np.arange(len(x))
    ax = axes[cls_idx]
    ax.plot(t, x, color='k', linewidth=0.8)
    amp = x.max() - x.min() if x.max() != x.min() else 1.0
    base = x.min() - 0.12*amp
    overlay = base + cam * (0.25*amp)
    ax.fill_between(t, base, overlay, where=cam>0, color='orange', alpha=0.6)
    ax.scatter(t, x, c=cam, cmap='jet', s=5)
    ax.set_xlim(0, len(x))
    ax.set_yticks([])
    ax.set_title(f"{cls_name} — true:{classes[int(y)]} pred:{classes[pred]} p={prob:.2f}", fontsize=9)

plt.tight_layout()
combined_path = SAVE_DIR / f"{MODEL_NAME}_gradcam_combined.png"
plt.savefig(combined_path, dpi=200, bbox_inches='tight')
plt.close(fig)

# display and print
display(Image.open(str(combined_path)))
print("Saved combined:", combined_path)
print("\nSlide text (copy-paste):")
print("- Orange/red overlay = model importance (higher = more important).")
print("- Model focuses on QRS region for Normal, and on widened QRS for Ventricular beats.")
print("- Grad-CAM confirms model uses medically meaningful ECG features.")


RuntimeError: Model or test_dataset missing. Run the restore cell first.

In [None]:
# ================= GRAD-CAM FUNCTION (must run this first) =================

def gradcam_1d_nohook(model, input_tensor, target_class, target_layer):
    """
    input_tensor: (1,1,L) tensor on device
    returns: cam_norm (L,) and predicted probability for target_class
    """
    import torch
    import torch.nn.functional as F
    import numpy as np

    activations = []

    # forward hook to capture activations
    def forward_hook(m, inp, out):
        activations.append(out)

    fh = target_layer.register_forward_hook(forward_hook)

    # forward pass
    out = model(input_tensor)
    score = out[0, target_class]

    # gradients of score wrt activations
    act = activations[0]
    grads = torch.autograd.grad(outputs=score, inputs=act)[0]

    fh.remove()

    # compute weights
    weights = grads.mean(dim=2, keepdim=True)  # (1,C,1)

    # weighted sum of feature maps
    cam = F.relu((weights * act).sum(dim=1, keepdim=True))

    # upsample to input size
    cam_up = F.interpolate(cam, size=input_tensor.shape[-1], mode='linear', align_corners=False)
    cam_np = cam_up.squeeze().detach().cpu().numpy()

    # normalize
    cam_norm = (cam_np - cam_np.min()) / (cam_np.max() - cam_np.min() + 1e-9)

    prob = float(torch.softmax(out.detach(), dim=1)[0, target_class].cpu().numpy())
    return cam_norm, prob


In [None]:
# ======= One self-contained Grad-CAM generation cell (run after model, test_dataset, device exist) =======
import os, random, math
from pathlib import Path
import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
from PIL import Image

# ---------- User-editable settings ----------
MODEL_NAME = globals().get('MODEL_NAME', 'ResNet18_1D')
RESULTS_ROOT = globals().get('RESULTS_ROOT', '/content/drive/MyDrive/ECG_Datasets/Results')
SAVE_DIR = Path(RESULTS_ROOT) / MODEL_NAME
MAX_PER_CLASS = 5     # how many images per class to save
MAX_SCAN = 5000       # how many test samples to scan (speed)
RANDOM_SEED = 42
# --------------------------------------------

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

# Basic checks
model = globals().get('model', None)
test_dataset = globals().get('test_dataset', None)
device = globals().get('device', None)
classes = globals().get('classes', None)
if classes is None:
    classes = ['N','S','V','F','Q']

if model is None or test_dataset is None or device is None:
    raise RuntimeError("Missing required objects: ensure 'model', 'test_dataset', and 'device' are defined in the session.")

# ensure output dir
os.makedirs(SAVE_DIR, exist_ok=True)

# helper: find last Conv1d
def find_last_conv1d(mod):
    last = None
    last_name = None
    for name, m in mod.named_modules():
        if isinstance(m, torch.nn.Conv1d):
            last = m
            last_name = name
    return last, last_name

# Grad-CAM function (uses forward hook only, no backward hooks that persist)
def gradcam_1d_nohook(model, input_tensor, target_class, target_layer):
    """
    input_tensor: (1,1,L) tensor on device
    returns: cam_norm (L,) and predicted probability for target_class
    """
    activations = []

    def forward_hook(m, inp, out):
        activations.append(out)   # keep tensor that requires grad

    fh = target_layer.register_forward_hook(forward_hook)
    try:
        # forward (do NOT wrap in no_grad because we need grads w.r.t. activations)
        out = model(input_tensor)          # (1, C)
        score = out[0, target_class]       # scalar tensor

        if len(activations) == 0:
            raise RuntimeError("Activation not captured by forward hook.")

        act = activations[0]               # (1, C, Lf), requires_grad=True

        # compute gradients of score w.r.t. activations
        grads = torch.autograd.grad(outputs=score, inputs=act, retain_graph=False, create_graph=False)[0]  # (1,C,Lf)

        # channel weights: global average pooling over time dim
        weights = grads.mean(dim=2, keepdim=True)   # (1,C,1)

        # weighted sum and ReLU
        cam = F.relu((weights * act).sum(dim=1, keepdim=True))  # (1,1,Lf)

        # upsample to input length
        cam_up = F.interpolate(cam, size=input_tensor.shape[-1], mode='linear', align_corners=False)
        cam_np = cam_up.squeeze().detach().cpu().numpy()

        # normalize to [0,1]
        cam_norm = (cam_np - cam_np.min()) / (cam_np.max() - cam_np.min() + 1e-9)

        prob = float(torch.softmax(out.detach(), dim=1)[0, target_class].cpu().numpy())
        return cam_norm, prob

    finally:
        fh.remove()

# find target layer
target_layer, layer_name = find_last_conv1d(model)
if target_layer is None:
    raise RuntimeError("No Conv1d layer found in model for Grad-CAM. Make sure the model contains Conv1d layers.")
print("Using target layer for Grad-CAM:", layer_name)

# Collect predictions/probs for test set (scan subset for speed)
n_test = len(test_dataset)
idxs = list(range(n_test))
if n_test > MAX_SCAN:
    idxs = random.sample(idxs, MAX_SCAN)

softmax = torch.nn.Softmax(dim=1)
model.to(device)
model.eval()

preds = []
probs_all = []
labels = []
scan_map = []  # map index in scanned list -> original dataset idx
with torch.no_grad():
    for i in idxs:
        x, y = test_dataset[i]
        inp = x.unsqueeze(0).to(device)
        out = model(inp)
        prob = softmax(out).cpu().numpy()[0]
        pred = int(prob.argmax())
        preds.append(pred)
        probs_all.append(prob)
        labels.append(int(y.item()))
        scan_map.append(i)

# Build per-class selection lists (prefer high-confidence correct predictions)
per_class_indices = {c: [] for c in range(len(classes))}

# first pass: correct predictions (gather tuples of (original_idx, confidence))
for local_idx, orig_idx in enumerate(scan_map):
    true_cls = labels[local_idx]
    pred_cls = preds[local_idx]
    conf = float(probs_all[local_idx][pred_cls])
    if pred_cls == true_cls and len(per_class_indices[true_cls]) < MAX_PER_CLASS:
        per_class_indices[true_cls].append((orig_idx, conf))

# second pass: fill with any samples of that class if not enough
for local_idx, orig_idx in enumerate(scan_map):
    true_cls = labels[local_idx]
    if len(per_class_indices[true_cls]) >= MAX_PER_CLASS:
        continue
    existing = [t[0] for t in per_class_indices[true_cls]]
    if orig_idx in existing:
        continue
    # prefer samples where model predicted that class (even if wrong), else accept any
    pred_cls = preds[local_idx]
    conf = float(probs_all[local_idx][pred_cls])
    per_class_indices[true_cls].append((orig_idx, conf))

# trim and sort by confidence
for cls in per_class_indices:
    lst = per_class_indices[cls]
    lst_sorted = sorted(lst, key=lambda x: x[1], reverse=True)[:MAX_PER_CLASS]
    per_class_indices[cls] = [t[0] for t in lst_sorted]

print("Collected examples per class (idx lists):")
for cls in range(len(classes)):
    print(f"  {classes[cls]}: {per_class_indices[cls]}")

# Generate and save images
for cls_idx, cls_name in enumerate(classes):
    chosen_idxs = per_class_indices.get(cls_idx, [])
    if len(chosen_idxs) == 0:
        print(f"No examples found for class {cls_name}, skipping.")
        continue

    # create directory for this class
    class_dir = SAVE_DIR / f"gradcam_{cls_name}"
    os.makedirs(class_dir, exist_ok=True)

    # Save individual images
    for j, idx in enumerate(chosen_idxs):
        sample, y = test_dataset[idx]
        x = sample.squeeze().cpu().numpy()
        inp = sample.unsqueeze(0).to(device)

        # get model prediction (with no_grad for speed)
        with torch.no_grad():
            out = model(inp)
            pred = int(out.argmax(1).cpu().numpy()[0])

        # compute cam (this needs autograd)
        cam, prob = gradcam_1d_nohook(model, inp, pred, target_layer)

        # plot single figure
        fig, ax = plt.subplots(1,1, figsize=(10,2.4))
        t = np.arange(len(x))
        ax.plot(t, x, color='k', linewidth=0.8)
        amp = x.max() - x.min() if x.max() != x.min() else 1.0
        base = x.min() - 0.12*amp
        overlay = base + cam * (0.25*amp)
        ax.fill_between(t, base, overlay, where=cam>0, color='orange', alpha=0.6)
        ax.scatter(t, x, c=cam, cmap='jet', s=5)
        ax.set_xlim(0, len(x)); ax.set_yticks([])
        ax.set_title(f"{cls_name} — true:{classes[int(y)]} pred:{classes[pred]} p={prob:.2f}", fontsize=10)

        indiv_path = class_dir / f"{MODEL_NAME}_gradcam_{cls_name}_{j+1}_idx{idx}.png"
        plt.tight_layout()
        plt.savefig(indiv_path, dpi=200, bbox_inches='tight')
        plt.close(fig)

    # Create a combined grid for this class (1 row, ncols = len(chosen_idxs))
    ncols = len(chosen_idxs)
    fig, axes = plt.subplots(1, ncols, figsize=(4*ncols, 2.6))
    if ncols == 1:
        axes = [axes]
    for ax, idx in zip(axes, chosen_idxs):
        sample, y = test_dataset[idx]
        x = sample.squeeze().cpu().numpy()
        inp = sample.unsqueeze(0).to(device)
        with torch.no_grad():
            out = model(inp)
            pred = int(out.argmax(1).cpu().numpy()[0])
        cam, prob = gradcam_1d_nohook(model, inp, pred, target_layer)

        t = np.arange(len(x))
        ax.plot(t, x, color='k', linewidth=0.8)
        amp = x.max() - x.min() if x.max() != x.min() else 1.0
        base = x.min() - 0.12*amp
        overlay = base + cam * (0.25*amp)
        ax.fill_between(t, base, overlay, where=cam>0, color='orange', alpha=0.6)
        ax.scatter(t, x, c=cam, cmap='jet', s=5)
        ax.set_xlim(0, len(x)); ax.set_yticks([])
        ax.set_title(f"idx:{idx} pred:{classes[pred]} p={prob:.2f}", fontsize=9)

    plt.suptitle(f"{MODEL_NAME} Grad-CAM examples — Class {cls_name}", fontsize=12)
    plt.tight_layout(rect=[0,0,1,0.95])
    combined_path = SAVE_DIR / f"{MODEL_NAME}_gradcam_{cls_name}_grid.png"
    plt.savefig(combined_path, dpi=200, bbox_inches='tight')
    plt.close(fig)

    print(f"Saved {len(chosen_idxs)} individual images + grid for class {cls_name} in {class_dir} and {combined_path}")

print("Done. All images saved under:", SAVE_DIR)


In [None]:
# ======= Minimal Grad-CAM (1–2 samples per class) =======
import os, random
from pathlib import Path
import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

# ------------ SETTINGS ------------
MODEL_NAME = "ResNet18_1D"
RESULTS_ROOT = "/content/drive/MyDrive/ECG_Datasets/Results"
SAVE_DIR = Path(RESULTS_ROOT) / MODEL_NAME
MAX_PER_CLASS = 2     # Only 1–2 Grad-CAMs per class
MAX_SCAN = 3000
os.makedirs(SAVE_DIR, exist_ok=True)
# -----------------------------------

model.eval()
softmax = torch.nn.Softmax(dim=1)

# -------- Find last Conv1D layer --------
def find_last_conv1d(mod):
    last = None
    for name, layer in mod.named_modules():
        if isinstance(layer, torch.nn.Conv1d):
            last = layer
    return last

target_layer = find_last_conv1d(model)
print("Target Conv Layer:", target_layer)

# -------- Grad-CAM Function --------
def gradcam_1d_nohook(model, input_tensor, target_class, target_layer):
    activations = []
    def forward_hook(m, inp, out):
        activations.append(out)
    fh = target_layer.register_forward_hook(forward_hook)

    out = model(input_tensor)
    score = out[0, target_class]

    act = activations[0]
    grads = torch.autograd.grad(score, act)[0]

    fh.remove()

    weights = grads.mean(dim=2, keepdim=True)
    cam = F.relu((weights * act).sum(dim=1, keepdim=True))
    cam_up = F.interpolate(cam, size=input_tensor.shape[-1], mode='linear')
    cam_np = cam_up.squeeze().detach().cpu().numpy()
    cam_norm = (cam_np - cam_np.min()) / (cam_np.max() - cam_np.min() + 1e-9)

    prob = float(torch.softmax(out, dim=1)[0, target_class].cpu().numpy())
    return cam_norm, prob

# -------- Select 1–2 samples per class --------
classes = ["N","S","V","F","Q"]
per_class = {c: [] for c in range(len(classes))}

idxs = list(range(min(MAX_SCAN, len(test_dataset))))
random.shuffle(idxs)

for i in idxs:
    x, y = test_dataset[i]
    inp = x.unsqueeze(0).to(device)
    with torch.no_grad():
        out = model(inp)
    pred = int(out.argmax(1))

    if len(per_class[pred]) < MAX_PER_CLASS:
        per_class[pred].append(i)

    if all(len(v) >= MAX_PER_CLASS for v in per_class.values()):
        break

print("Selected samples per class:", per_class)

# -------- Generate Grad-CAM Plots --------
for cls_idx, cls_name in enumerate(classes):
    sample_idxs = per_class[cls_idx]
    for j, idx in enumerate(sample_idxs):
        sample, y = test_dataset[idx]
        x = sample.squeeze().cpu().numpy()
        inp = sample.unsqueeze(0).to(device)

        with torch.no_grad():
            out = model(inp)
            pred = int(out.argmax(1))

        cam, prob = gradcam_1d_nohook(model, inp, pred, target_layer)

        # --- Plot ---
        t = np.arange(len(x))
        fig, ax = plt.subplots(1,1, figsize=(10,2.5))
        ax.plot(t, x, color='black', linewidth=1)
        base = x.min() - 0.2*(x.max()-x.min())
        overlay = base + cam * 0.3*(x.max()-x.min())

        ax.fill_between(t, base, overlay, color='orange', alpha=0.5)
        ax.set_title(f"{cls_name} — pred:{classes[pred]}  p={prob:.2f}   (idx:{idx})")
        ax.set_yticks([])
        ax.set_xlim(0, len(x))

        out_path = SAVE_DIR / f"GradCAM_{cls_name}_{j+1}.png"
        plt.savefig(out_path, dpi=200, bbox_inches='tight')
        plt.close()

print("\nSaved 1–2 Grad-CAM images per class in:", SAVE_DIR)


In [None]:
MODEL_NAME = "ResNet18_1D"
model_dir = f"/content/drive/MyDrive/ECG_Datasets/Results/{MODEL_NAME}/{MODEL_NAME}_best.pt"

model = ResNet18_1D(input_length=X_train.shape[1], num_classes=5)
model.load_state_dict(torch.load(model_dir, map_location=device))
model.to(device)
model.eval()

