In [2]:
!pip install -q kaggle imbalanced-learn

import os

# Set Kaggle credentials directly via environment variables
os.environ['KAGGLE_USERNAME'] = "fahdahmad1911"  # <-- Replace with your exact Kaggle username
os.environ['KAGGLE_KEY'] = "KGAT_2169cc375cf12dfc4f46de043619ccac"

print("Credentials set. Ready to download.")

print("Credentials set. Ready to download.")

Credentials set. Ready to download.
Credentials set. Ready to download.


In [3]:
!kaggle datasets download -d shayanfazeli/heartbeat
!unzip -o -q heartbeat.zip

print("Dataset downloaded and extracted successfully.")

Dataset URL: https://www.kaggle.com/datasets/shayanfazeli/heartbeat
License(s): unknown
Downloading heartbeat.zip to /content
  0% 0.00/98.8M [00:00<?, ?B/s]
100% 98.8M/98.8M [00:00<00:00, 1.35GB/s]
Dataset downloaded and extracted successfully.


In [4]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import classification_report
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

# 1. Load Data
train = pd.read_csv('mitbih_train.csv', header=None)
test = pd.read_csv('mitbih_test.csv', header=None)

# 2. Separate features and labels
X_train_raw = train.iloc[:, :-1].values
y_train = train.iloc[:, -1].values
X_test_raw = test.iloc[:, :-1].values
y_test = test.iloc[:, -1].values

# 3. Padding function
def pad_to_length(signal, target_len=200):
    if len(signal) < target_len:
        return np.pad(signal, (0, target_len - len(signal)), 'constant')
    return signal[:target_len]

X_train_pad = np.array([pad_to_length(x, 200) for x in X_train_raw])
X_test_pad = np.array([pad_to_length(x, 200) for x in X_test_raw])

if X_train_pad.ndim == 3: X_train_pad = X_train_pad.squeeze()
if X_test_pad.ndim == 3: X_test_pad = X_test_pad.squeeze()

# 4. Balance the Training Data Dynamically
target_count = 20000

class_counts = pd.Series(y_train).value_counts().to_dict()
under_strategy = {c: target_count for c, count in class_counts.items() if count > target_count}
over_strategy = {c: target_count for c, count in class_counts.items() if count < target_count}

under = RandomUnderSampler(sampling_strategy=under_strategy, random_state=42)
over = SMOTE(sampling_strategy=over_strategy, random_state=42)

print("Balancing training data (this takes a moment)...")

if under_strategy:
    X_train_bal, y_train_bal = under.fit_resample(X_train_pad, y_train)
else:
    X_train_bal, y_train_bal = X_train_pad, y_train

if over_strategy:
    X_train_bal, y_train_bal = over.fit_resample(X_train_bal, y_train_bal)

print("\nBalanced Train shape:", X_train_bal.shape)
print("Class distribution in train:\n", pd.Series(y_train_bal).value_counts())

Balancing training data (this takes a moment)...

Balanced Train shape: (100000, 200)
Class distribution in train:
 0.0    20000
1.0    20000
2.0    20000
3.0    20000
4.0    20000
Name: count, dtype: int64


In [5]:
# 1. Create PyTorch Datasets and DataLoaders
train_dataset = TensorDataset(
    torch.tensor(X_train_bal, dtype=torch.float32),
    torch.tensor(y_train_bal, dtype=torch.long)
)
test_dataset = TensorDataset(
    torch.tensor(X_test_pad, dtype=torch.float32),
    torch.tensor(y_test, dtype=torch.long)
)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 2. Define the Lightweight 1D CNN
class LightECGNet_V2(nn.Module):
    def __init__(self, num_classes=5):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=7, padding=3)
        self.bn1 = nn.BatchNorm1d(16) # <--- Added BatchNorm
        self.pool1 = nn.MaxPool1d(2)

        self.conv2 = nn.Conv1d(16, 32, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(32) # <--- Added BatchNorm
        self.pool2 = nn.MaxPool1d(2)

        self.conv3 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(64) # <--- Added BatchNorm
        self.pool3 = nn.MaxPool1d(2)

        self.fc1 = nn.Linear(64 * 25, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.pool1(F.relu(self.bn1(self.conv1(x)))) # Apply BN before ReLU
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))

        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

model = LightECGNet_V2(num_classes=5).to(device)


Using device: cuda


In [6]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

epochs = 15

print("Starting training...")
for epoch in range(epochs):
    # Training Phase
    model.train()
    total_loss = 0
    for signals, labels in train_loader:
        signals, labels = signals.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(signals)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)

    # Validation Phase
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for signals, labels in test_loader:
            signals, labels = signals.to(device), labels.to(device)
            outputs = model(signals)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f} | Test Acc: {accuracy:.2f}%")

Starting training...
Epoch 1/15 | Loss: 0.2398 | Test Acc: 93.52%
Epoch 2/15 | Loss: 0.1215 | Test Acc: 95.30%
Epoch 3/15 | Loss: 0.0931 | Test Acc: 96.41%
Epoch 4/15 | Loss: 0.0774 | Test Acc: 94.66%
Epoch 5/15 | Loss: 0.0671 | Test Acc: 96.58%
Epoch 6/15 | Loss: 0.0581 | Test Acc: 97.62%
Epoch 7/15 | Loss: 0.0526 | Test Acc: 97.25%
Epoch 8/15 | Loss: 0.0470 | Test Acc: 97.73%
Epoch 9/15 | Loss: 0.0437 | Test Acc: 96.87%
Epoch 10/15 | Loss: 0.0418 | Test Acc: 97.42%
Epoch 11/15 | Loss: 0.0373 | Test Acc: 97.78%
Epoch 12/15 | Loss: 0.0350 | Test Acc: 97.33%
Epoch 13/15 | Loss: 0.0322 | Test Acc: 97.70%
Epoch 14/15 | Loss: 0.0311 | Test Acc: 96.80%
Epoch 15/15 | Loss: 0.0289 | Test Acc: 97.33%


In [7]:
from google.colab import files

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for signals, labels in test_loader:
        signals = signals.to(device)
        outputs = model(signals)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.numpy())

target_names = ['N (Normal)', 'S (AFib-like)', 'V (PVC-like)', 'F (Fusion)', 'Q (Unknown)']
print("\nClassification Report on Test Set:")
print(classification_report(all_labels, all_preds, target_names=target_names))

# Save and download the model
checkpoint_path = 'light_ecg_cnn_balanced.pth'
torch.save(model.state_dict(), checkpoint_path)
print(f"\nModel weights saved locally as {checkpoint_path}")

files.download(checkpoint_path)


Classification Report on Test Set:
               precision    recall  f1-score   support

   N (Normal)       0.99      0.98      0.99     18118
S (AFib-like)       0.67      0.86      0.75       556
 V (PVC-like)       0.91      0.96      0.93      1448
   F (Fusion)       0.68      0.85      0.75       162
  Q (Unknown)       0.98      0.99      0.99      1608

     accuracy                           0.97     21892
    macro avg       0.84      0.93      0.88     21892
 weighted avg       0.98      0.97      0.97     21892


Model weights saved locally as light_ecg_cnn_balanced.pth


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>