In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install numpy pandas scipy scikit-learn torch torchaudio matplotlib torch_geometric wfdb tqdm


In [None]:
import os
import ast
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import wfdb
from scipy.signal import butter, lfilter, resample

from torch_geometric.nn import GCNConv, global_mean_pool
from torch_geometric.data import Data as GraphData


In [None]:
# Load metadata
df = pd.read_csv('/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/ptbxl_database.csv')
scp_statements = pd.read_csv('/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/scp_statements.csv', index_col=0)

# Use only diagnostic SCP codes
def extract_dominant_superclass(scp_dict):
    filtered = {k: v for k, v in scp_dict.items() if k in scp_statements.index and scp_statements.loc[k, 'diagnostic'] == 1}
    if not filtered:
        return 'NORM'
    dominant_code = max(filtered, key=filtered.get)
    superclass = scp_statements.loc[dominant_code, 'diagnostic_class']
    return superclass if pd.notna(superclass) else 'NORM'

df['scp_codes'] = df['scp_codes'].apply(ast.literal_eval)
df['diagnostic_superclass'] = df['scp_codes'].apply(extract_dominant_superclass)

# Keep only the desired 5 superclasses
valid_superclasses = ['NORM', 'MI', 'STTC', 'CD', 'HYP']
df = df[df['diagnostic_superclass'].isin(valid_superclasses)]

# Encode labels
le = LabelEncoder()
df['class_id'] = le.fit_transform(df['diagnostic_superclass'])

print("\nClass mapping:")
for cls, idx in zip(le.classes_, range(len(le.classes_))):
    print(f"{idx}: {cls}")

print("\nDistribution:")
print(df['diagnostic_superclass'].value_counts())


In [None]:
train_df, test_df = train_test_split(
    df,
    test_size=0.2,
    stratify=df['class_id'],
    random_state=42
)

train_df, val_df = train_test_split(
    train_df,
    test_size=0.1,
    stratify=train_df['class_id'],
    random_state=42
)

print("\nTrain distribution:\n", train_df['diagnostic_superclass'].value_counts())
print("\nValidation distribution:\n", val_df['diagnostic_superclass'].value_counts())
print("\nTest distribution:\n", test_df['diagnostic_superclass'].value_counts())


In [None]:
def butter_bandpass(lowcut, highcut, fs, order=1):
    b, a = butter(order, [lowcut/(0.5*fs), highcut/(0.5*fs)], btype='band')
    return b, a

def bandpass_filter(data, lowcut=0.5, highcut=45.0, fs=500.0, order=1):
    b, a = butter_bandpass(lowcut, highcut, fs, order)
    return lfilter(b, a, data)

def preprocess_signal(signal, target_fs=100):
    fs = 500
    filtered = np.array([bandpass_filter(lead, fs=fs) for lead in signal])
    if fs != target_fs:
        filtered = resample(filtered, int(filtered.shape[1]*target_fs/fs), axis=1)
    filtered = (filtered - filtered.mean(axis=1, keepdims=True)) / (filtered.std(axis=1, keepdims=True) + 1e-6)
    return filtered.astype(np.float32)


In [None]:
class PTBXLDataset(Dataset):
    def __init__(self, df, data_dir):
        self.df = df.reset_index(drop=True)
        self.data_dir = data_dir

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        record = wfdb.rdrecord(os.path.join(self.data_dir, row['filename_hr']))
        signal = record.p_signal.T
        signal = preprocess_signal(signal)
        label = row['class_id']
        return torch.tensor(signal), torch.tensor(label)


In [None]:
class HybridModel(nn.Module):
    def __init__(self, n_classes):
        super().__init__()
        self.cnn = nn.Sequential(
            nn.Conv1d(12, 32, kernel_size=7, padding=3), nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=5, padding=2), nn.ReLU(),
            nn.AdaptiveAvgPool1d(128)  # [B, 64, 128]
        )
        encoder_layer = nn.TransformerEncoderLayer(d_model=64, nhead=4)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=2)

        self.fc = nn.Sequential(
            nn.Linear(64, 32), nn.ReLU(),
            nn.Linear(32, n_classes)
        )

    def forward(self, x):
        x = self.cnn(x)                # [B, 64, 128]
        x = x.permute(2, 0, 1)        # [128, B, 64] → sequence_len, batch, features
        x = self.transformer(x)      # [128, B, 64]
        x = x.permute(1, 0, 2).mean(1)  # [B, 64] (mean over sequence_len)
        out = self.fc(x)             # [B, n_classes]
        return out


In [None]:
# import torch
# import torch.nn as nn

# class HybridModel(nn.Module):
#     def __init__(self, n_classes):
#         super(HybridModel, self).__init__()
        
#         # CNN Feature Extractor
#         self.cnn = nn.Sequential(
#             nn.Conv1d(12, 32, kernel_size=7, padding=3), 
#             nn.BatchNorm1d(32), 
#             nn.ReLU(),
#             nn.Conv1d(32, 64, kernel_size=5, padding=2), 
#             nn.BatchNorm1d(64),
#             nn.ReLU(),
#             nn.AdaptiveAvgPool1d(128)  # [B, 64, 128]
#         )

#         # Positional Encoding (optional but recommended for Transformers)
#         self.pos_embedding = nn.Parameter(torch.randn(128, 1, 64))  # [Seq_len, 1, d_model]

#         # Transformer Encoder
#         encoder_layer = nn.TransformerEncoderLayer(
#             d_model=64, nhead=4, dim_feedforward=256, dropout=0.1, batch_first=False
#         )
#         self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=2)

#         # Final classification head
#         self.fc = nn.Sequential(
#             nn.Linear(64, 32),
#             nn.ReLU(),
#             nn.Dropout(0.3),
#             nn.Linear(32, n_classes)
#         )

#     def forward(self, x):
#         x = self.cnn(x)                  # [B, 64, 128]
#         x = x.permute(2, 0, 1)           # [128, B, 64]

#         # Add positional encoding
#         x = x + self.pos_embedding       # [128, B, 64]

#         x = self.transformer(x)          # [128, B, 64]
#         x = x.permute(1, 0, 2).mean(dim=1)  # Global average pooling [B, 64]
#         out = self.fc(x)                 # [B, n_classes]
#         return out


In [None]:
def run_epoch(model, loader, optimizer, criterion, device, train=True):
    model.train() if train else model.eval()
    total_loss, correct, total = 0, 0, 0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        if train:
            optimizer.zero_grad()
        logits = model(x)
        loss = criterion(logits, y)
        if train:
            loss.backward()
            optimizer.step()
        total_loss += loss.item() * y.size(0)
        preds = logits.argmax(1)
        correct += (preds == y).sum().item()
        total += y.size(0)
    return total_loss/total, correct/total


In [None]:
train_ds = PTBXLDataset(train_df, '/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1')
val_ds = PTBXLDataset(val_df, '/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1')
test_ds = PTBXLDataset(test_df, '/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1')

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=16)
test_loader = DataLoader(test_ds, batch_size=16)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\nUsing device: {device}")

model = HybridModel(len(le.classes_)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)
criterion = nn.CrossEntropyLoss()


In [None]:
best_val_loss = float('inf')
history = {'train_loss':[], 'train_acc':[], 'val_loss':[], 'val_acc':[]}

for epoch in range(60):
    print(f"\nEpoch {epoch+1}")
    train_loss, train_acc = run_epoch(model, train_loader, optimizer, criterion, device, train=True)
    val_loss, val_acc = run_epoch(model, val_loader, optimizer, criterion, device, train=False)

    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)

    scheduler.step(val_loss)

    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pt')


In [None]:
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.title("Loss"); plt.xlabel("Epoch"); plt.legend()
plt.subplot(1,2,2)
plt.plot(history['train_acc'], label='Train Acc')
plt.plot(history['val_acc'], label='Val Acc')
plt.title("Accuracy"); plt.xlabel("Epoch"); plt.legend()
plt.show()


In [None]:
model.load_state_dict(torch.load('best_model.pt'))
model.eval()

y_true, y_pred = [], []
with torch.no_grad():
    for x, y in tqdm(test_loader, desc="Testing"):
        x = x.to(device)
        logits = model(x)
        preds = logits.argmax(1).cpu()
        y_true.extend(y.tolist())
        y_pred.extend(preds.tolist())

test_acc = np.mean(np.array(y_true) == np.array(y_pred))
print(f"\nFinal Test Accuracy: {test_acc:.4f}")
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=le.classes_))
