# Task 1: Arrhythmia Classification Using 1D CNN

This notebook builds an end-to-end ECG arrhythmia classifier using a 1D Convolutional Neural Network (PyTorch). It downloads the Heartbeat Dataset from Google Drive, preprocesses ECG beats, trains and validates a CNN, and reports Accuracy, Precision, Recall, F1-score, and a Confusion Matrix with plots. Run on a GPU runtime in Colab for best performance.

In [None]:
# Setup: installs for Colab (safe to rerun)
import sys, subprocess, pkgutil

required = [
    'gdown','numpy','pandas','scikit-learn','matplotlib','seaborn','wfdb','neurokit2',
    'torch','torchvision','torchaudio'
]

for pkg in required:
    if pkg not in {m.name for m in pkgutil.iter_modules()}:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', pkg])

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
DEVICE

## Download dataset from Google Drive

- Public link provided. If access is restricted, authenticate in Colab.
- We download the zip via `gdown` and extract.
- If Colab Drive mount is preferred, you can skip `gdown` and copy the file manually.

In [None]:
import os, zipfile, pathlib, sys
from pathlib import Path

DATA_DIR = Path('data_ecg')
DATA_DIR.mkdir(exist_ok=True)
ZIP_PATH = DATA_DIR / 'ecg_heartbeat.zip'
EXTRACT_DIR = DATA_DIR / 'extracted'
EXTRACT_DIR.mkdir(exist_ok=True)

# Google Drive file id from the provided link
GDRIVE_ID = '1xAs-CjlpuDqUT2EJUVR5cPuqTUdw2uQg'

try:
    import gdown
    if not ZIP_PATH.exists():
        url = f'https://drive.google.com/uc?id={GDRIVE_ID}'
        gdown.download(url, str(ZIP_PATH), quiet=False)
    # Extract
    with zipfile.ZipFile(ZIP_PATH, 'r') as zf:
        zf.extractall(EXTRACT_DIR)
except Exception as e:
    print('Download or extraction failed:', e)
    print('If in Colab, try authenticating Google Drive and copy the file to', ZIP_PATH)

list(EXTRACT_DIR.glob('**/*'))[:10]

## Preprocessing

- Load ECG beats and labels from extracted directory.
- Normalize each beat to zero mean, unit variance.
- Pad/trim to a fixed length if needed.
- Create train/validation/test splits.

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path

# Heuristic loader for common heartbeat dataset formats (e.g., Kaggle heartbeat, MIT-BIH beat slices)
# Expecting structure like: EXTRACT_DIR/<class_name>/*.csv or a master CSV with label column.

def discover_files(root: Path):
    csvs = list(root.glob('**/*.csv'))
    npys = list(root.glob('**/*.npy'))
    return csvs, npys

csvs, npys = discover_files(EXTRACT_DIR)
print('Found CSVs:', len(csvs), 'NPYs:', len(npys))

# Try to load into (samples, length) and labels
X_list, y_list = [], []
label_to_id = {}

if csvs:
    # Case 1: directory per class or master CSV with label column
    for csv_path in csvs:
        try:
            df = pd.read_csv(csv_path)
        except Exception:
            continue
        # If a label column exists
        label_col = None
        for cand in ['label','class','Class','y','target']:
            if cand in df.columns:
                label_col = cand
                break
        if label_col is not None and df.shape[0] > 0:
            # Treat rows as samples
            labels = df[label_col].values
            features = df.drop(columns=[label_col]).values
            # Clean NaNs
            features = np.nan_to_num(features)
            X_list.append(features)
            y_list.append(labels)
        else:
            # Single-sample CSVs in class directories
            # Infer label from parent directory name
            label_name = csv_path.parent.name
            arr = pd.read_csv(csv_path, header=None).values.flatten()
            X_list.append(arr[None, :])
            y_list.append(np.array([label_name]))
elif npys:
    # If npy files exist; infer label from filename or parent
    for npy_path in npys:
        arr = np.load(npy_path)
        if arr.ndim == 1:
            arr = arr[None, :]
        label_name = npy_path.parent.name
        X_list.append(arr)
        y_list.append(np.array([label_name]*arr.shape[0]))
else:
    raise RuntimeError('No CSV or NPY files found. Please verify dataset structure after extraction.')

X = np.vstack(X_list)
y = np.concatenate(y_list)

# Encode labels to integers
classes = sorted(np.unique(y))
class_to_id = {c:i for i,c in enumerate(classes)}
y_ids = np.array([class_to_id[c] for c in y], dtype=np.int64)

# Standardize each sample
X = np.nan_to_num(X)
X = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-6)

# Ensure fixed length (pad/trim) to max_len
max_len = int(np.percentile([len(x) for x in X], 95)) if X.ndim == 2 else X.shape[1]

def pad_trim(arr, L):
    if arr.shape[-1] == L:
        return arr
    if arr.shape[-1] > L:
        return arr[..., :L]
    out = np.zeros((arr.shape[0], L), dtype=arr.dtype)
    out[:, :arr.shape[1]] = arr
    return out

if X.ndim == 1:
    X = X[None, :]
X = pad_trim(X, max_len)

print('Data shape:', X.shape, 'Num classes:', len(classes))

In [None]:
# Train/Val/Test split
X_train, X_temp, y_train, y_temp = train_test_split(X, y_ids, test_size=0.3, random_state=42, stratify=y_ids)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

len(X_train), len(X_val), len(X_test)

In [None]:
class ECGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        if self.X.ndim == 2:
            self.X = self.X.unsqueeze(1)  # (N, 1, L)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_ds = ECGDataset(X_train, y_train)
val_ds = ECGDataset(X_val, y_val)
test_ds = ECGDataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=128, shuffle=True, num_workers=2)
val_loader = DataLoader(val_ds, batch_size=256, shuffle=False, num_workers=2)
test_loader = DataLoader(test_ds, batch_size=256, shuffle=False, num_workers=2)

len(train_ds), len(val_ds), len(test_ds)

In [None]:
class ECG1DCNN(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.feature = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=7, padding=3),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(2),

            nn.Conv1d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2),

            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(2),
        )
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Dropout(0.3),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes)
        )
    def forward(self, x):
        x = self.feature(x)
        x = self.classifier(x)
        return x

model = ECG1DCNN(num_classes=len(classes)).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
model

In [None]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad(set_to_none=True)
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * xb.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == yb).sum().item()
        total += xb.size(0)
    return running_loss/total, correct/total

@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    all_preds, all_labels = [], []
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        loss = criterion(logits, yb)
        running_loss += loss.item() * xb.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == yb).sum().item()
        total += xb.size(0)
        all_preds.append(preds.cpu().numpy())
        all_labels.append(yb.cpu().numpy())
    avg_loss = running_loss/total
    acc = correct/total
    return avg_loss, acc, np.concatenate(all_preds), np.concatenate(all_labels)

EPOCHS = 20
best_val_acc = 0
best_state = None

for epoch in range(1, EPOCHS+1):
    tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, criterion, DEVICE)
    val_loss, val_acc, _, _ = evaluate(model, val_loader, criterion, DEVICE)
    print(f'Epoch {epoch:02d}: train_loss={tr_loss:.4f} train_acc={tr_acc:.4f} val_loss={val_loss:.4f} val_acc={val_acc:.4f}')
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state = {k:v.cpu() for k,v in model.state_dict().items()}

if best_state is not None:
    model.load_state_dict({k:v.to(DEVICE) for k,v in best_state.items()})


In [None]:
# Final evaluation on test set
te_loss, te_acc, te_preds, te_labels = evaluate(model, test_loader, criterion, DEVICE)
print('Test loss:', te_loss, 'Test acc:', te_acc)

print(classification_report(te_labels, te_preds, target_names=classes, digits=4))
cm = confusion_matrix(te_labels, te_preds)

plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.tight_layout()
plt.show()

## Notes and References

- We used a compact 1D CNN suited for ECG beats. Optionally, add a bidirectional LSTM after the convolutional stack for sequential context.
- Handle class imbalance via class weights or focal loss if needed.
- Reference tutorial inspiration: CNN for arrhythmia detection (various PyTorch/TF guides on Medium/Kaggle).
