In [19]:
import os
import json
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import precision_recall_fscore_support
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


Using device: cuda


In [20]:
class ECGCNN(nn.Module):
    def __init__(self, num_classes=5):
        super(ECGCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 32, kernel_size=5, stride=1, padding=2)
        self.bn1 = nn.BatchNorm1d(32)
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.conv2 = nn.Conv1d(32, 64, kernel_size=5, stride=1, padding=2)
        self.bn2 = nn.BatchNorm1d(64)
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2)
        self.bn3 = nn.BatchNorm1d(128)
        self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)

        # Assuming beat length 216 -> 108 -> 54 -> 27 after pooling
        self.fc1 = nn.Linear(128 * 27, 256)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


In [21]:
clean_data_path = '/kaggle/input/ecg-datasets/ecg_clean.csv'
df = pd.read_csv(clean_data_path)
print(f"Dataset loaded: {df.shape}")

if df['label'].dtype == 'object':
    le = LabelEncoder()
    df['label'] = le.fit_transform(df['label'])

feature_cols = [c for c in df.columns if c != 'label']
for c in feature_cols:
    df[c] = pd.to_numeric(df[c], errors='coerce')
df = df.dropna()

X = torch.tensor(df[feature_cols].values, dtype=torch.float32)
y = torch.tensor(df['label'].values, dtype=torch.long)

# Standardize per feature
X = (X - X.mean(0)) / (X.std(0) + 1e-8)

print(f"X shape: {X.shape}, y shape: {y.shape}")
print("Class dist:", np.bincount(y.numpy()))


Dataset loaded: (100033, 217)
X shape: torch.Size([100033, 216]), y shape: torch.Size([100033])
Class dist: [ 2546  8073 75028  7257  7129]


In [22]:
def add_awgn(signal, snr_db):
    signal_power = torch.mean(signal ** 2)
    noise_power = signal_power / (10 ** (snr_db / 10))
    noise = torch.randn_like(signal) * torch.sqrt(noise_power)
    return signal + noise

class NoisyDataset(torch.utils.data.Dataset):
    def __init__(self, X, y, add_noise_prob=0.7, snr_levels=(0,3,6,9,12,15,18,20)):
        self.X = X
        self.y = y
        self.add_noise_prob = add_noise_prob
        self.snr_levels = snr_levels

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = self.X[idx]
        y = self.y[idx]
        if torch.rand(1).item() < self.add_noise_prob:
            snr = float(np.random.choice(self.snr_levels))
            x = add_awgn(x, snr)
        return x, y


In [23]:
def evaluate_with_metrics(model, loader, device):
    model.eval()
    preds, labels = [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            pred = torch.argmax(logits, 1)
            preds.append(pred.cpu().numpy())
            labels.append(yb.cpu().numpy())
    preds = np.concatenate(preds)
    labels = np.concatenate(labels)
    acc = (preds == labels).mean()
    prec, rec, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted', zero_division=0)
    return {'accuracy': acc, 'precision': prec, 'recall': rec, 'f1_score': f1}


In [24]:
def train_one_fold(train_loader, val_loader, device, num_epochs=30, patience=10):
    model = ECGCNN(num_classes=5).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    best_val_f1 = -1.0
    best_state = None
    patience_ctr = 0
    history = {'train_loss': [], 'train_acc': [], 'val_acc': [], 'val_f1': []}

    for epoch in range(num_epochs):
        model.train()
        total, correct, loss_sum = 0, 0, 0.0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            optimizer.step()
            loss_sum += loss.item()
            pred = torch.argmax(logits, 1)
            correct += (pred == yb).sum().item()
            total += yb.size(0)

        train_acc = correct / total
        history['train_loss'].append(loss_sum / max(1, len(train_loader)))
        history['train_acc'].append(train_acc)

        # Validation
        val_metrics = evaluate_with_metrics(model, val_loader, device)
        history['val_acc'].append(val_metrics['accuracy'])
        history['val_f1'].append(val_metrics['f1_score'])

        print(f"  Epoch {epoch+1:2d}/30 - "
              f"Loss: {history['train_loss'][-1]:.4f} - "
              f"Train Acc: {train_acc:.4f} - "
              f"Val Acc: {val_metrics['accuracy']:.4f} - "
              f"Val F1: {val_metrics['f1_score']:.4f}")

        if val_metrics['f1_score'] > best_val_f1:
            best_val_f1 = val_metrics['f1_score']
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
            patience_ctr = 0
        else:
            patience_ctr += 1
            if patience_ctr >= patience:
                print(f"  ✓ Early stopping at epoch {epoch+1}")
                break

    model.load_state_dict(best_state)
    model = model.to(device)
    return model, history, best_val_f1


In [25]:
def train_noise_aug_with_kfold(X, y, k=5, num_epochs=30, patience=10, batch_size=64, add_noise_prob=0.7):
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
    fold_results, histories = [], []

    print("\n" + "="*70)
    print(f"NOISE-AUGMENTED CNN TRAINING WITH {k}-FOLD CV")
    print("="*70 + "\n")

    for fold, (tr_idx, va_idx) in enumerate(skf.split(X, y)):
        print("\n" + "="*70)
        print(f"FOLD {fold+1}/{k}")
        print("="*70)

        X_tr, X_va = X[tr_idx], X[va_idx]
        y_tr, y_va = y[tr_idx], y[va_idx]

        train_ds = NoisyDataset(X_tr, y_tr, add_noise_prob=add_noise_prob)
        val_ds   = TensorDataset(X_va, y_va)

        train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
        val_loader   = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

        model, hist, best_f1 = train_one_fold(train_loader, val_loader, device, num_epochs, patience)
        val_metrics = evaluate_with_metrics(model, val_loader, device)

        print(f"\nFold {fold+1} Results:")
        print(f"  Accuracy:  {val_metrics['accuracy']:.4f}")
        print(f"  Precision: {val_metrics['precision']:.4f}")
        print(f"  Recall:    {val_metrics['recall']:.4f}")
        print(f"  F1-Score:  {val_metrics['f1_score']:.4f}")

        fold_results.append({
            'fold': fold+1,
            'accuracy': val_metrics['accuracy'],
            'precision': val_metrics['precision'],
            'recall': val_metrics['recall'],
            'f1_score': val_metrics['f1_score'],
        })
        histories.append(hist)

        torch.save(model.state_dict(), f'/kaggle/working/aug_model_fold_{fold+1}.pth')

    avg = {
        'accuracy_mean': float(np.mean([r['accuracy'] for r in fold_results])),
        'accuracy_std': float(np.std([r['accuracy'] for r in fold_results])),
        'precision_mean': float(np.mean([r['precision'] for r in fold_results])),
        'precision_std': float(np.std([r['precision'] for r in fold_results])),
        'recall_mean': float(np.mean([r['recall'] for r in fold_results])),
        'recall_std': float(np.std([r['recall'] for r in fold_results])),
        'f1_score_mean': float(np.mean([r['f1_score'] for r in fold_results])),
        'f1_score_std': float(np.std([r['f1_score'] for r in fold_results])),
    }

    pd.DataFrame(fold_results).to_csv('/kaggle/working/kfold_results.csv', index=False)
    with open('/kaggle/working/avg_metrics.json', 'w') as f:
        json.dump(avg, f, indent=2)

    print("\n" + "="*70)
    print("CROSS-VALIDATION SUMMARY (NOISE-AUGMENTED)")
    print("="*70)
    print(f"Accuracy:  {avg['accuracy_mean']:.4f} ± {avg['accuracy_std']:.4f}")
    print(f"Precision: {avg['precision_mean']:.4f} ± {avg['precision_std']:.4f}")
    print(f"Recall:    {avg['recall_mean']:.4f} ± {avg['recall_std']:.4f}")
    print(f"F1-Score:  {avg['f1_score_mean']:.4f} ± {avg['f1_score_std']:.4f}")
    print("="*70 + "\n")

    return fold_results, avg, histories


In [26]:
fold_results, avg_metrics, histories = train_noise_aug_with_kfold(
    X, y, 
    k=5,
    num_epochs=30,
    patience=10,
    batch_size=64,
    add_noise_prob=0.7
)

print(" Noise-augmented k-fold training complete.")



NOISE-AUGMENTED CNN TRAINING WITH 5-FOLD CV


FOLD 1/5
  Epoch  1/30 - Loss: 0.1269 - Train Acc: 0.9636 - Val Acc: 0.9866 - Val F1: 0.9861
  Epoch  2/30 - Loss: 0.0701 - Train Acc: 0.9795 - Val Acc: 0.9886 - Val F1: 0.9883
  Epoch  3/30 - Loss: 0.0587 - Train Acc: 0.9822 - Val Acc: 0.9898 - Val F1: 0.9895
  Epoch  4/30 - Loss: 0.0527 - Train Acc: 0.9836 - Val Acc: 0.9901 - Val F1: 0.9899
  Epoch  5/30 - Loss: 0.0474 - Train Acc: 0.9853 - Val Acc: 0.9911 - Val F1: 0.9909
  Epoch  6/30 - Loss: 0.0432 - Train Acc: 0.9875 - Val Acc: 0.9927 - Val F1: 0.9926
  Epoch  7/30 - Loss: 0.0391 - Train Acc: 0.9883 - Val Acc: 0.9922 - Val F1: 0.9922
  Epoch  8/30 - Loss: 0.0384 - Train Acc: 0.9881 - Val Acc: 0.9924 - Val F1: 0.9923
  Epoch  9/30 - Loss: 0.0357 - Train Acc: 0.9891 - Val Acc: 0.9922 - Val F1: 0.9921
  Epoch 10/30 - Loss: 0.0352 - Train Acc: 0.9893 - Val Acc: 0.9936 - Val F1: 0.9935
  Epoch 11/30 - Loss: 0.0323 - Train Acc: 0.9898 - Val Acc: 0.9936 - Val F1: 0.9935
  Epoch 12/30 - Loss

In [27]:
# Select best fold by F1
best_fold_idx = int(np.argmax([r['f1_score'] for r in fold_results])) + 1
print(f"Best fold (noise-aug): {best_fold_idx}")

best_model = ECGCNN(num_classes=5).to(device)
best_model.load_state_dict(torch.load(f'/kaggle/working/aug_model_fold_{best_fold_idx}.pth'))

def evaluate_on_noisy_snr(model, X_clean, y_clean, snr, batch_size=64):
    noisy = torch.stack([add_awgn(x, snr) for x in X_clean], dim=0)
    loader = DataLoader(TensorDataset(noisy, y_clean), batch_size=batch_size, shuffle=False)
    return evaluate_with_metrics(model, loader, device)

snr_levels = [0,3,6,9,12,15,18,20]
rows = []
for snr in snr_levels:
    m = evaluate_on_noisy_snr(best_model, X, y, snr)
    rows.append({
        'snr_db': int(snr),
        'accuracy': float(m['accuracy']),
        'precision': float(m['precision']),
        'recall': float(m['recall']),
        'f1_score': float(m['f1_score'])
    })
    print(f"SNR {snr} dB -> acc={m['accuracy']:.4f}  f1={m['f1_score']:.4f}")

snr_df = pd.DataFrame(rows)
snr_df.to_csv('/kaggle/working/snr_evaluation_results.csv', index=False)
print("✅ SNR evaluation saved to snr_evaluation_results.csv")


Best fold (noise-aug): 3
SNR 0 dB -> acc=0.9872  f1=0.9869
SNR 3 dB -> acc=0.9936  f1=0.9935
SNR 6 dB -> acc=0.9957  f1=0.9956
SNR 9 dB -> acc=0.9968  f1=0.9968
SNR 12 dB -> acc=0.9975  f1=0.9975
SNR 15 dB -> acc=0.9978  f1=0.9978
SNR 18 dB -> acc=0.9980  f1=0.9980
SNR 20 dB -> acc=0.9980  f1=0.9980
✅ SNR evaluation saved to snr_evaluation_results.csv


In [28]:
# Cell 10: Evaluate on Noisy Test Sets at Different SNR Levels
snr_levels = [0, 3, 6, 9, 12, 15, 18, 20]
baseline_snr_results = []

print(f"\n{'SNR (dB)':<10} {'Accuracy':<12} {'Precision':<12} {'Recall':<12} {'F1-Score':<12}")
print("-"*70)

for snr in snr_levels:
    # Check if pre-made noisy CSV exists
    noisy_csv_path = f'/kaggle/input/ecg-datasets/ecg_noisy_{snr}db.csv'
    
    if os.path.exists(noisy_csv_path):
        # Load pre-made noisy dataset
        df_noisy = pd.read_csv(noisy_csv_path)
        
        if df_noisy['label'].dtype == 'object':
            le = LabelEncoder()
            df_noisy['label'] = le.fit_transform(df_noisy['label'])
        
        feature_cols = [col for col in df_noisy.columns if col != 'label']
        for col in feature_cols:
            df_noisy[col] = pd.to_numeric(df_noisy[col], errors='coerce')
        df_noisy = df_noisy.dropna()
        
        X_noisy = torch.tensor(df_noisy.drop('label', axis=1).values, dtype=torch.float32)
        y_noisy = torch.tensor(df_noisy['label'].values, dtype=torch.long)
        X_noisy = (X_noisy - X_noisy.mean(dim=0)) / (X_noisy.std(dim=0) + 1e-8)
    else:
        # Generate noisy data on-the-fly from clean data
        X_noisy = torch.stack([add_awgn(x, snr) for x in X], dim=0)
        y_noisy = y.clone()
    
    # Create loader
    noisy_dataset = TensorDataset(X_noisy, y_noisy)
    noisy_loader = DataLoader(noisy_dataset, batch_size=64, shuffle=False)
    
    # Evaluate
    metrics = evaluate_with_metrics(best_baseline_model, noisy_loader, device)
    
    baseline_snr_results.append({
        'snr_db': snr,
        'baseline_accuracy': metrics['accuracy'],
        'baseline_precision': metrics['precision'],
        'baseline_recall': metrics['recall'],
        'baseline_f1_score': metrics['f1_score']
    })
    
    print(f"{snr:<10} {metrics['accuracy']:<12.4f} {metrics['precision']:<12.4f} "
          f"{metrics['recall']:<12.4f} {metrics['f1_score']:<12.4f}")

print("-"*70)



SNR (dB)   Accuracy     Precision    Recall       F1-Score    
----------------------------------------------------------------------
0          0.7537       0.7252       0.7537       0.7064      
3          0.8014       0.8001       0.8014       0.7814      
6          0.8638       0.8731       0.8638       0.8608      
9          0.9135       0.9240       0.9135       0.9157      
12         0.9407       0.9508       0.9407       0.9438      
15         0.9577       0.9673       0.9577       0.9611      
18         0.9738       0.9806       0.9738       0.9761      
20         0.9836       0.9866       0.9836       0.9846      
----------------------------------------------------------------------


In [33]:
# Cell 11: Save Baseline SNR Results (FIXED)
baseline_snr_df = pd.DataFrame(baseline_snr_results)
baseline_snr_df.to_csv('/kaggle/working/baseline_snr_results.csv', index=False)

# Calculate and save summary statistics
summary_stats = {
    'model': 'Baseline CNN (Clean Data Only - K-Fold CV)',
    'k_folds': 5,
    'best_fold': int(best_fold),  # ← Convert to Python int
    'snr_levels': [int(x) for x in snr_levels],  # ← Convert to Python int list
    'clean_data_performance': {
        'accuracy_mean': float(avg_metrics['accuracy_mean']),  # ← Convert to Python float
        'accuracy_std': float(avg_metrics['accuracy_std']),
        'f1_score_mean': float(avg_metrics['f1_score_mean']),
        'f1_score_std': float(avg_metrics['f1_score_std'])
    },
    'noisy_data_performance': {
        'avg_accuracy': float(baseline_snr_df['baseline_accuracy'].mean()),
        'avg_precision': float(baseline_snr_df['baseline_precision'].mean()),
        'avg_recall': float(baseline_snr_df['baseline_recall'].mean()),
        'avg_f1_score': float(baseline_snr_df['baseline_f1_score'].mean()),
        'worst_accuracy_snr': int(baseline_snr_df.loc[baseline_snr_df['baseline_accuracy'].idxmin(), 'snr_db']),
        'worst_accuracy_value': float(baseline_snr_df['baseline_accuracy'].min()),
        'best_accuracy_snr': int(baseline_snr_df.loc[baseline_snr_df['baseline_accuracy'].idxmax(), 'snr_db']),
        'best_accuracy_value': float(baseline_snr_df['baseline_accuracy'].max())
    }
}

with open('/kaggle/working/baseline_summary.json', 'w') as f:
    json.dump(summary_stats, f, indent=2)



In [32]:
# Cell 12: Display Complete Summary
print("\n" + "="*70)
print("COMPLETE BASELINE CNN SUMMARY")
print("="*70)

print("\n1. K-Fold Cross-Validation Results (Clean Data):")
print(f"   Accuracy:  {avg_metrics['accuracy_mean']:.4f} ± {avg_metrics['accuracy_std']:.4f}")
print(f"   Precision: {avg_metrics['precision_mean']:.4f} ± {avg_metrics['precision_std']:.4f}")
print(f"   Recall:    {avg_metrics['recall_mean']:.4f} ± {avg_metrics['recall_std']:.4f}")
print(f"   F1-Score:  {avg_metrics['f1_score_mean']:.4f} ± {avg_metrics['f1_score_std']:.4f}")

print("\n2. Average Performance on Noisy Data (0-20 dB SNR):")
print(f"   Accuracy:  {summary_stats['noisy_data_performance']['avg_accuracy']:.4f}")
print(f"   Precision: {summary_stats['noisy_data_performance']['avg_precision']:.4f}")
print(f"   Recall:    {summary_stats['noisy_data_performance']['avg_recall']:.4f}")
print(f"   F1-Score:  {summary_stats['noisy_data_performance']['avg_f1_score']:.4f}")

print("\n3. Performance Range on Noisy Data:")
print(f"   Best:  {summary_stats['noisy_data_performance']['best_accuracy_value']:.4f} at SNR {summary_stats['noisy_data_performance']['best_accuracy_snr']} dB")
print(f"   Worst: {summary_stats['noisy_data_performance']['worst_accuracy_value']:.4f} at SNR {summary_stats['noisy_data_performance']['worst_accuracy_snr']} dB")
print(f"   Degradation: {(summary_stats['noisy_data_performance']['best_accuracy_value'] - summary_stats['noisy_data_performance']['worst_accuracy_value']) * 100:.2f}%")

print("\n4. Best Fold Model Used:")
print(f"   Fold: {best_fold}")
print(f"   F1-Score: {fold_results[best_fold_idx]['f1_score']:.4f}")




COMPLETE BASELINE CNN SUMMARY

1. K-Fold Cross-Validation Results (Clean Data):
   Accuracy:  0.9945 ± 0.0004
   Precision: 0.9945 ± 0.0004
   Recall:    0.9945 ± 0.0004
   F1-Score:  0.9944 ± 0.0004

2. Average Performance on Noisy Data (0-20 dB SNR):
   Accuracy:  0.8985
   Precision: 0.9010
   Recall:    0.8985
   F1-Score:  0.8913

3. Performance Range on Noisy Data:
   Best:  0.9836 at SNR 20 dB
   Worst: 0.7537 at SNR 0 dB
   Degradation: 22.99%

4. Best Fold Model Used:
   Fold: 1
   F1-Score: 0.9946
