# Loading the data

In [None]:
!pip install torch numpy pandas matplotlib scipy tensorflow onnx tqdm wfdb albumentations wfdb




#Load the data

In [None]:
import os
os.getcwd()

'/content'

In [None]:
# Function to load ECG data
def load_ecg_record(record_name, data_dir='/content/'):
    """
    Load ECG record from WFDB format.

    Args:
        record_name: Name of the record (e.g., '100')
        data_dir: Directory containing the data files

    Returns:
        record: WFDB record object
        signals: ECG signals
        annotations: ECG annotations
    """
    try:
        # Load record
        record = wfdb.rdrecord(f"{data_dir}{record_name}")

        # Load annotations
        ann = wfdb.rdann(f"{data_dir}{record_name}", 'atr')

        return record, record.p_signal, ann
    except Exception as e:
        print(f"❌ Error loading record {record_name}: {e}")
        return None, None, None

# List of available records (first few for demonstration)
record_names = ['100', '101', '102', '103', '104', '105', '106', '107', '108', '109', '111', '112', '113', '114', '115', '116', '117', '118', '119', '121', '122', '123', '124', '200', '201', '202', '203', '205', '207', '208', '209', '210', '212', '213', '214', '215', '217', '219', '220', '221', '222', '223', '228', '230', '231', '232', '233', '234']

print("📁 Available records:")
for name in record_names:
    print(f"   - {name}")

📁 Available records:
   - 100
   - 101
   - 102
   - 103
   - 104
   - 105
   - 106
   - 107
   - 108
   - 109
   - 111
   - 112
   - 113
   - 114
   - 115
   - 116
   - 117
   - 118
   - 119
   - 121
   - 122
   - 123
   - 124
   - 200
   - 201
   - 202
   - 203
   - 205
   - 207
   - 208
   - 209
   - 210
   - 212
   - 213
   - 214
   - 215
   - 217
   - 219
   - 220
   - 221
   - 222
   - 223
   - 228
   - 230
   - 231
   - 232
   - 233
   - 234


In [None]:
# Import required libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import wfdb
import scipy
from scipy import signal
from scipy.stats import pearsonr
import warnings
warnings.filterwarnings('ignore')

# Set plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 12

print("✅ Libraries imported successfully")

✅ Libraries imported successfully


In [None]:

# Load a sample record
record_name = '100'  # This is a common record with good signal quality
record, signals, annotations = load_ecg_record(record_name)

if record is not None:
    print(f"✅ Successfully loaded record {record_name}")
    print(f"   Duration: {len(signals) / record.fs / 60:.1f} minutes")
    print(f"   Sampling rate: {record.fs} Hz")
    print(f"   Number of leads: {signals.shape[1]}")
    print(f"   Signal shape: {signals.shape}")
    print(f"   Number of annotations: {len(annotations.sample)}")

    # Get lead names
    print(f"   Lead names: {record.sig_name}")
else:
    print("⚠️  Using simulated data for demonstration")
    # Create simulated ECG data for demonstration
    fs = 360  # Hz
    duration = 60  # seconds
    t = np.linspace(0, duration, duration * fs)

    # Simulate normal sinus rhythm
    heart_rate = 75  # BPM
    rr_interval = 60 / heart_rate
    r_peaks = np.arange(0, duration, rr_interval)

    ecg = np.zeros_like(t)
    for r_peak in r_peaks:
        r_idx = int(r_peak * fs)
        if r_idx < len(ecg):
            ecg[r_idx] = 1.0

            # Add QRS complex
            qrs_width = int(0.1 * fs)
            start_idx = max(0, r_idx - qrs_width // 2)
            end_idx = min(len(ecg), r_idx + qrs_width // 2)

            if start_idx < r_idx:
                ecg[start_idx:r_idx] = -0.1
            if r_idx < end_idx:
                ecg[r_idx:end_idx] = -0.2

    # Add noise and baseline wander
    baseline = 0.1 * np.sin(2 * np.pi * 0.1 * t)
    noise = 0.05 * np.random.randn(len(t))
    ecg = ecg + baseline + noise

    signals = ecg.reshape(-1, 1)
    record = type('Record', (), {'fs': fs, 'sig_name': ['MLII']})()
    annotations = type('Annotations', (), {'sample': np.array([1000, 2000, 3000])})()

✅ Successfully loaded record 100
   Duration: 30.1 minutes
   Sampling rate: 360 Hz
   Number of leads: 2
   Signal shape: (650000, 2)
   Number of annotations: 2274
   Lead names: ['MLII', 'V5']


# Preprocess Data (Windowing, Patching, and Linear Projection)


1.   Windowing
2.   Patching
3.   Linear Projection and Positional Encoding




In [None]:
import numpy as np


window_size = 1800  # 5 seconds at 360Hz
stride = 1800       # No overlap

ecg_signal = signals.flatten()
windows = [ecg_signal[i:i+window_size] for i in range(0, len(ecg_signal), stride)]


#Checking the size of each window
len(windows[0])


1800

In [None]:
import torch


window_size = 1800  # For example, 5 seconds at 360Hz = 1800 samples per window
patch_size = 20  # Each patch contains 20 samples

# Step 1: Segment the ECG signal into windows
windows = [ecg_signal[i:i+window_size] for i in range(0, len(ecg_signal), window_size)]
print(len(windows[0]))

# Step 2: Convert each window into patches
patches = []

for window in windows:
    # Divide the window into smaller patches (e.g., of size 20)
    window_patches = [window[i:i+patch_size] for i in range(0, len(window), patch_size)]
    patches.extend(window_patches)  # Add the patches to the main list

# Step 3: Convert patches to a PyTorch tensor
patches_tensor = torch.tensor(patches, dtype=torch.float32)

# Verify the shape of the patches tensor
print(patches_tensor.shape)  # Should print torch.Size([num_patches, patch_size])



1800
torch.Size([65000, 20])


In [None]:
import torch
import torch.nn as nn

class LinearProjection(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(LinearProjection, self).__init__()
        self.fc = nn.Linear(input_dim, output_dim)

    def forward(self, x):
        return self.fc(x)

# Example: linear projection from 20 samples (input) to 32-dimensional vectors (output)
patch_embedding = LinearProjection(20, 32)
embeddings = patch_embedding(torch.tensor(patches_tensor, dtype=torch.float32))



print(embeddings.shape)

torch.Size([65000, 32])


In [None]:
class TinyTransformer(nn.Module):
    def __init__(self, patch_size, embed_size, num_heads, num_layers, num_classes):
        super(TinyTransformer, self).__init__()

        self.patch_embedding = LinearProjection(patch_size, embed_size)
        # Changed positional encoding to match batch processing
        self.positional_encoding = nn.Parameter(torch.randn(1, 90, embed_size))  # 90 patches per window

        # Transformer Encoder Layers
        self.encoder_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=embed_size,
                nhead=num_heads,
                dim_feedforward=128)
            for _ in range(num_layers)
        ])

        self.classifier = nn.Linear(embed_size, num_classes)

    def forward(self, x):
        # x shape: [batch_size, patch_size] -> [32, 20]

        # 1. Apply patch embedding
        x = self.patch_embedding(x)  # [32, 20] -> [32, embed_size]

        # 2. Reshape and add positional encoding
        x = x.unsqueeze(1)  # [32, embed_size] -> [32, 1, embed_size]
        x = x + self.positional_encoding[:, :1, :]  # Add positional encoding for first position

        # 3. Transformer expects [seq_len, batch_size, embed_size]
        x = x.transpose(0, 1)  # [1, 32, embed_size]

        # 4. Pass through transformer layers
        for layer in self.encoder_layers:
            x = layer(x)

        # 5. Get back to [batch_size, embed_size]
        x = x.transpose(0, 1)  # [32, 1, embed_size]
        x = x.squeeze(1)  # [32, embed_size]

        # 6. Final classification
        return self.classifier(x)

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class ECGDataset(Dataset):
    def __init__(self, signals, annotations, patch_size=20, window_size=1800, fs=360):
        """
        Args:
            signals (numpy.array or list): ECG signal (shape: [num_windows, window_size])
            annotations (wfdb.Annotation): Annotations corresponding to the signals
            patch_size (int): Size of each patch (default is 20)
            window_size (int): Size of each window (default is 1800)
            fs (int): Sampling frequency (default 360 Hz)
        """
        self.signals = signals
        self.annotations = annotations
        self.patch_size = patch_size
        self.window_size = window_size
        self.fs = fs  # Sampling frequency
        self.patches, self.labels = self._create_patches_and_labels()

    def _create_patches_and_labels(self):
        patches = []
        labels = []

        # Map annotation symbols to numerical labels
        annotation_labels = {
            'N': 0,  # Normal beat
            'V': 1,  # Premature ventricular contraction (PVC)
            'A': 2,  # Atrial premature beat
            'F': 3,  # Fusion beat
            'L': 4   # Left bundle branch block
            # Add other types as needed
        }

        # For each signal window
        for window_idx, window in enumerate(self.signals):
            window_start_sample = window_idx * self.window_size
            window_end_sample = window_start_sample + self.window_size

            # Find annotations that fall within this window
            window_labels = []

            for i, sample in enumerate(self.annotations.sample):
                if window_start_sample <= sample < window_end_sample:
                    # Map annotation to label
                    symbol = self.annotations.symbol[i]
                    if symbol in annotation_labels:
                        window_labels.append(annotation_labels[symbol])

            # If the window has any labels, use the majority as the label for the window
            if window_labels:
                label = max(set(window_labels), key=window_labels.count)  # Majority label in this window
            else:
                label = 0  # Default to normal if no annotations found in the window

            # Split the window into patches
            for i in range(0, len(window), self.patch_size):
                patch = window[i:i + self.patch_size]
                if len(patch) == self.patch_size:
                    patches.append(patch)
                    labels.append(label)

        return patches, labels
# Usage example:
# Assuming 'signals' is your ECG data loaded earlier
dataset = ECGDataset(signals, annotations = annotations)  # Flatten if multi-lead
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Example of using the dataloader
for batch in dataloader:
    patches, labels = batch
    print(f"Batch patches shape: {patches.shape}")  # Should be [32, 20]
    print(f"Batch labels shape: {labels.shape}")    # Should be [32]
    break

TypeError: object of type 'ECGDataset' has no len()

In [None]:
# import torch.optim as optim

# model = TinyTransformer(patch_size=20, embed_size=32, num_heads=4, num_layers=2, num_classes=5)

# # Loss and optimizer
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# epochs = 1

# # Training loop (simplified)
# for epoch in range(epochs):
#     for batch in dataloader:
#         ecg_data, labels = batch
#         optimizer.zero_grad()

#         # Forward pass
#         outputs = model(ecg_data)
#         loss = criterion(outputs, labels)

#         # Backpropagate and optimize
#         loss.backward()
#         optimizer.step()

#     print(f"Epoch {epoch}: Loss = {loss.item()}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

# 1. Prepare the Dataset (using the DataLoader we created earlier)
dataset = ECGDataset(signals.flatten())  # Assuming 'signals' is your ECG data

# Split into train and validation sets (80-20 split)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# 2. Initialize Model, Loss, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TinyTransformer(
    patch_size=20,
    embed_size=32,
    num_heads=4,
    num_layers=2,
    num_classes=5
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)

# 3. Training Loop
num_epochs = 10
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training phase
    model.train()
    train_loss = 0.0
    all_preds = []
    all_labels = []

    for patches, labels in train_loader:
        patches, labels = patches.to(device), labels.to(device)

        # Forward pass
        outputs = model(patches)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Track metrics
        train_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    # Calculate training metrics
    train_loss /= len(train_loader)
    train_acc = accuracy_score(all_labels, all_preds)
    train_f1 = f1_score(all_labels, all_preds, average='weighted')

    # Validation phase
    model.eval()
    val_loss = 0.0
    val_preds = []
    val_labels = []

    with torch.no_grad():
        for patches, labels in val_loader:
            patches, labels = patches.to(device), labels.to(device)

            outputs = model(patches)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(labels.cpu().numpy())

    val_loss /= len(val_loader)
    val_acc = accuracy_score(val_labels, val_preds)
    val_f1 = f1_score(val_labels, val_preds, average='weighted')

    # Update learning rate
    scheduler.step(val_loss)

    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')

    # Print epoch summary
    print(f"Epoch {epoch+1}/{num_epochs}:")
    print(f"  Train Loss: {train_loss:.4f} | Acc: {train_acc:.4f} | F1: {train_f1:.4f}")
    print(f"  Val Loss: {val_loss:.4f} | Acc: {val_acc:.4f} | F1: {val_f1:.4f}")
    print("-" * 60)

# 4. Save Final Model
torch.save(model.state_dict(), 'final_model.pth')
print("Training complete!")

Epoch 1/10:
  Train Loss: 1.6145 | Acc: 0.2073 | F1: 0.2072
  Val Loss: 1.6100 | Acc: 0.2024 | F1: 0.0841
------------------------------------------------------------
Epoch 2/10:
  Train Loss: 1.6095 | Acc: 0.2050 | F1: 0.2043
  Val Loss: 1.6111 | Acc: 0.2025 | F1: 0.0815
------------------------------------------------------------
Epoch 3/10:
  Train Loss: 1.6088 | Acc: 0.2016 | F1: 0.2007
  Val Loss: 1.6077 | Acc: 0.2075 | F1: 0.0805
------------------------------------------------------------
Epoch 4/10:
  Train Loss: 1.6080 | Acc: 0.2049 | F1: 0.2044
  Val Loss: 1.6079 | Acc: 0.2067 | F1: 0.1104
------------------------------------------------------------
Epoch 5/10:
  Train Loss: 1.6079 | Acc: 0.2034 | F1: 0.2026
  Val Loss: 1.6065 | Acc: 0.2090 | F1: 0.0947
------------------------------------------------------------
Epoch 6/10:
  Train Loss: 1.6077 | Acc: 0.2019 | F1: 0.1976
  Val Loss: 1.6071 | Acc: 0.2015 | F1: 0.0887
-----------------------------------------------------------

# Tiny Transformer Model


Model Components

Multi-head self-attention layer

Feed-forward network

Layer normalization + residual connections

Final classification layer