In [1]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import recall_score, precision_score, f1_score, confusion_matrix

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split


In [2]:
# =========================
# 1. load data & Dataset define
# =========================
data_dir = r".\archive"
train_path = os.path.join(data_dir, "mitbih_train.csv")
test_path = os.path.join(data_dir, "mitbih_test.csv")

train_df = pd.read_csv(train_path, header=None)
test_df = pd.read_csv(test_path, header=None)

X_train = train_df.iloc[:, :-1].values
y_train = train_df.iloc[:, -1].values.astype(int)

X_test = test_df.iloc[:, :-1].values
y_test = test_df.iloc[:, -1].values.astype(int)

# Z-score per sample
X_train = (X_train - X_train.mean(axis=1, keepdims=True)) / (X_train.std(axis=1, keepdims=True) + 1e-8)
X_test = (X_test - X_test.mean(axis=1, keepdims=True)) / (X_test.std(axis=1, keepdims=True) + 1e-8)

# reshape: (samples, 1, 187)
X_train = X_train[:, np.newaxis, :]
X_test = X_test[:, np.newaxis, :]

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)

X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)


In [3]:
# =========================
# 2. 定義 ECG CNN + Residual (5 層)
# =========================
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=5, dropout_prob=0.3):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, padding=kernel_size//2)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, padding=kernel_size//2)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.dropout = nn.Dropout(dropout_prob)
        
        # 如果 in_channels != out_channels，需要用 1x1 conv 對齊
        self.res_conv = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None

    def forward(self, x):
        residual = x
        if self.res_conv:
            residual = self.res_conv(x)
        
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.dropout(out)
        out += residual
        out = self.relu(out)
        return out


class ECG_CNN_Residual(nn.Module):
    def __init__(self, num_classes=5, dropout_prob=0.3):
        super(ECG_CNN_Residual, self).__init__()
        # 5 層設計，兩兩一個 residual block
        self.block1 = ResidualBlock(1, 32, dropout_prob=dropout_prob)
        self.block2 = ResidualBlock(32, 64, dropout_prob=dropout_prob)
        self.block3 = ResidualBlock(64, 128, dropout_prob=dropout_prob)
        self.block4 = ResidualBlock(128, 256, dropout_prob=dropout_prob)
        self.block5 = ResidualBlock(256, 256, dropout_prob=dropout_prob)  # 最後一層保持 256 channel

        self.gap = nn.AdaptiveAvgPool1d(1)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.gap(x).squeeze(-1)
        x = self.dropout(x)
        x = self.fc(x)
        return x


In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
save_dir = os.path.join(data_dir, "models")
os.makedirs(save_dir, exist_ok=True)
model_path = os.path.join(save_dir, "ECG_CNN_Residual.pt")


In [5]:
# =========================
# 3.Training
# =========================
#normal split
# dataset = TensorDataset(X_train_tensor, y_train_tensor)
# train_size = int(0.8 * len(dataset))
# val_size = len(dataset) - train_size
# train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
#keep same seed for all model
np.random.seed(42)
#Stratified split
class_ranges = [
    (0, 72471),
    (72471, 74694),
    (74694, 80483),
    (80483, 81123),
    (81123, 87554)
]

train_indices = []
val_indices = []

for start, end in class_ranges:
    idx = np.arange(start, end)
    np.random.shuffle(idx)  # shuffle
    n_val = int(len(idx) * 0.2)
    val_indices.extend(idx[:n_val])
    train_indices.extend(idx[n_val:])
    
#  tensor
train_dataset = TensorDataset(X_train_tensor[train_indices], y_train_tensor[train_indices])
val_dataset = TensorDataset(X_train_tensor[val_indices], y_train_tensor[val_indices])

#data loader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

model = ECG_CNN_Residual().to(device)
criterion = nn.CrossEntropyLoss()  
#optimizer = optim.Adam(model.parameters(), lr=1e-3)
optimizer = optim.AdamW(model.parameters(), lr=1e-3,weight_decay=1e-3)

num_epochs = 100
best_val_loss = float("inf")
patience_es = 15
no_improve_count = 0
for epoch in range(num_epochs):
    # ---- Training ----
    model.train()
    train_loss = 0
    for X, y in train_loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        outputs = model(X)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * X.size(0)
    train_loss /= len(train_loader.dataset)

    # ---- Validation ----
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for X, y in val_loader:
            X, y = X.to(device), y.to(device)
            outputs = model(X)
            loss = criterion(outputs, y)
            val_loss += loss.item() * X.size(0)
            preds = outputs.argmax(dim=1)
            correct += (preds == y).sum().item()
    val_loss /= len(val_loader.dataset)
    val_acc = correct / len(val_loader.dataset)

    print(f"Epoch {epoch+1}/{num_epochs} - "
          f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    # ---- Save Best ----
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), model_path)
        print("Best model saved.")
        no_improve_count = 0
    else:
        no_improve_count += 1
        if no_improve_count >= patience_es:
            print(f"Early stopping at epoch {epoch+1}")
            break



Epoch 1/100 - Train Loss: 0.2417 | Val Loss: 0.1803 | Val Acc: 0.9484
Best model saved.
Epoch 2/100 - Train Loss: 0.1567 | Val Loss: 0.1711 | Val Acc: 0.9562
Best model saved.
Epoch 3/100 - Train Loss: 0.1382 | Val Loss: 0.1421 | Val Acc: 0.9613
Best model saved.
Epoch 4/100 - Train Loss: 0.1240 | Val Loss: 0.1170 | Val Acc: 0.9679
Best model saved.
Epoch 5/100 - Train Loss: 0.1136 | Val Loss: 0.1186 | Val Acc: 0.9681
Epoch 6/100 - Train Loss: 0.1048 | Val Loss: 0.1047 | Val Acc: 0.9704
Best model saved.
Epoch 7/100 - Train Loss: 0.0972 | Val Loss: 0.1051 | Val Acc: 0.9718
Epoch 8/100 - Train Loss: 0.0932 | Val Loss: 0.0919 | Val Acc: 0.9738
Best model saved.
Epoch 9/100 - Train Loss: 0.0884 | Val Loss: 0.1081 | Val Acc: 0.9705
Epoch 10/100 - Train Loss: 0.0850 | Val Loss: 0.0940 | Val Acc: 0.9748
Epoch 11/100 - Train Loss: 0.0814 | Val Loss: 0.0804 | Val Acc: 0.9772
Best model saved.
Epoch 12/100 - Train Loss: 0.0783 | Val Loss: 0.0785 | Val Acc: 0.9782
Best model saved.
Epoch 13/100 

In [5]:
# =========================
# evaluation
# =========================
test_ds = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_ds, batch_size=128, shuffle=False)

# load model
model = ECG_CNN_Residual().to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
#eval
preds, labels = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        out = model(xb)
        pred = torch.argmax(out, dim=1)
        preds.extend(pred.cpu().numpy())
        labels.extend(yb.cpu().numpy())
#save csv
csv_path = os.path.join(save_dir, "test_pred.csv")
pd.DataFrame({"y_true": labels, "y_pred": preds}).to_csv(csv_path, index=False)
print(f"Evaluation CSV saved at {csv_path}")


Evaluation CSV saved at .\archive\models\test_pred.csv


In [6]:
# =========================
# confusion matrix & metrics
# =========================
cm = confusion_matrix(labels, preds, labels=list(range(5)))

print("===== Confusion Matrix =====")
print(cm)

# 計算各類 one-vs-all 指標
metrics_per_class = {"recall": [], "specificity": [], "precision": [], "f1": []}
class_counts = cm.sum(axis=1)
total_samples = class_counts.sum()
weights = class_counts / total_samples

for i in range(5):
    TP = cm[i, i]
    FP = cm[:, i].sum() - TP
    FN = cm[i, :].sum() - TP
    TN = cm.sum() - (TP + FP + FN)

    recall_i = TP / (TP + FN + 1e-8)
    specificity_i = TN / (TN + FP + 1e-8)
    precision_i = TP / (TP + FP + 1e-8)
    f1_i = 2 * recall_i * precision_i / (recall_i + precision_i + 1e-8)

    metrics_per_class["recall"].append(recall_i)
    metrics_per_class["specificity"].append(specificity_i)
    metrics_per_class["precision"].append(precision_i)
    metrics_per_class["f1"].append(f1_i)

macro_avg_metrics = {k: np.mean(v) for k, v in metrics_per_class.items()}
weighted_avg_metrics = {k: np.sum(np.array(v) * weights) for k, v in metrics_per_class.items()}

print("\n===== Per-Class Metrics =====")
for k, v in metrics_per_class.items():
    print(f"{k}: {np.round(v, 4)}")
print("\n===== Macro-Average Metrics =====")
for k, v in macro_avg_metrics.items():
    print(f"{k}: {v:.4f}")
print("\n===== Weighted-Average Metrics =====")
for k, v in weighted_avg_metrics.items():
    print(f"{k}: {v:.4f}")

===== Confusion Matrix =====
[[18051    42    19     3     3]
 [  138   411     6     1     0]
 [   53     4  1363    27     1]
 [   14     1    11   136     0]
 [   14     2     3     0  1589]]

===== Per-Class Metrics =====
recall: [0.9963 0.7392 0.9413 0.8395 0.9882]
specificity: [0.942  0.9977 0.9981 0.9986 0.9998]
precision: [0.988  0.8935 0.9722 0.8144 0.9975]
f1: [0.9921 0.8091 0.9565 0.8267 0.9928]

===== Macro-Average Metrics =====
recall: 0.9009
specificity: 0.9872
precision: 0.9331
f1: 0.9154

===== Weighted-Average Metrics =====
recall: 0.9844
specificity: 0.9518
precision: 0.9840
f1: 0.9840
