In [4]:
# ==========================================
# 1. ENVIRONMENT SETUP
# ==========================================
# We install the necessary libraries for data processing,
# image handling, and deep learning.
!pip install -q kaggle numpy pandas opencv-python matplotlib scikit-learn torch torchvision tqdm wfdb albumentations

import os
import zipfile
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from google.colab import files

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ==========================================
# 2. KAGGLE AUTHENTICATION
# ==========================================
# IMPORTANT: You must have a 'kaggle.json' file from your Kaggle account.
# Account -> Create New API Token.
print("Please upload your kaggle.json file:")
uploaded = files.upload()

if 'kaggle.json' not in uploaded:
    raise FileNotFoundError("kaggle.json not found. Please upload the correct file.")

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Verify the CLI works
!kaggle competitions list | grep "physionet-ecg-image-digitization"

# ==========================================
# 3. DATASET DOWNLOAD
# ==========================================
COMPETITION_NAME = "physionet-ecg-image-digitization"

print(f"Downloading dataset for {COMPETITION_NAME}...")

# Ensure any partially downloaded or existing zip is removed to allow a fresh download
zip_file_path = f"{COMPETITION_NAME}.zip"
if os.path.exists(zip_file_path):
    print(f"Removing existing {zip_file_path} before re-downloading.")
    !rm {zip_file_path}

# Re-attempt download
!kaggle competitions download -c {COMPETITION_NAME}

# Unzip the data
print("Extracting files...")
# Check if the zip file exists after download attempt
if not os.path.exists(zip_file_path):
    raise FileNotFoundError(f"Download failed: {zip_file_path} not found after kaggle download command.")

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall("data")

# Clean up zip
!rm {zip_file_path}

print("Directory structure:")
!ls -R data | head -n 20

# ==========================================
# 4. DATA EXPLORATION
# ==========================================
# We check the train.csv and test.csv and visualize some ECG images.
# In this competition, the goal is to extract time-series (digitization).
train_meta = pd.read_csv('data/train.csv')
test_meta = pd.read_csv('data/test.csv')

print(f"Train samples: {len(train_meta)}")
print(f"Test samples: {len(test_meta)}")
print(train_meta.head())

def plot_samples(df, num=3):
    plt.figure(figsize=(15, 5))
    for i in range(num):
        img_id = df.iloc[i]['base_id']
        # Note: The actual path might differ slightly based on unzip folder structure
        # We search for the image file
        img_path = f"data/train_images/{img_id}.png"
        if not os.path.exists(img_path):
            img_path = f"data/train_images/{img_id}.jpg"

        if os.path.exists(img_path):
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            plt.subplot(1, num, i+1)
            plt.imshow(img)
            plt.title(f"ID: {img_id}")
            plt.axis('off')
    plt.show()

# Visualize images (if the train_images folder exists)
# Note: In the actual competition, paths are often 'data/train_images/'
if os.path.exists('data/train_images'):
    plot_samples(train_meta)

# ==========================================
# 5. PREPROCESSING PIPELINE
# ==========================================
# We define a PyTorch Dataset.
# Digitization is a regression task where we map image pixels to signal values.
# For a baseline, we'll resize images to 256x256 and predict a fixed-size vector.

class ECGDigitizationDataset(Dataset):
    def __init__(self, df, img_dir, transform=None, is_test=False):
        self.df = df
        self.img_dir = img_dir
        self.transform = transform
        self.is_test = is_test

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_id = row['base_id']
        img_path = os.path.join(self.img_dir, f"{img_id}.png")
        if not os.path.exists(img_path):
            img_path = os.path.join(self.img_dir, f"{img_id}.jpg")

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transform:
            image = self.transform(image)

        if self.is_test:
            return image, img_id

        # Ground Truth: In a real baseline, you would load the .mat or .csv signals.
        # Here we create a dummy target to illustrate the training loop.
        # In this competition, signals are 12 leads.
        target = torch.zeros(12, 100) # Dummy: 12 leads, 100 points each
        return image, target

# Transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# ==========================================
# 6. BASELINE MODEL
# ==========================================
# A simple ResNet18 backbone with a regression head.
class ECGBaselineModel(nn.Module):
    def __init__(self, output_size=(12, 100)):
        super(ECGBaselineModel, self).__init__()
        self.backbone = models.resnet18(pretrained=True)
        num_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Linear(num_features, output_size[0] * output_size[1])
        self.output_size = output_size

    def forward(self, x):
        x = self.backbone(x)
        x = x.view(-1, self.output_size[0], self.output_size[1])
        return x

model = ECGBaselineModel().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# ==========================================
# 7. TRAINING & EVALUATION
# ==========================================
# Using a subset for demonstration.
train_df, val_df = train_test_split(train_meta, test_size=0.1, random_state=42)

# Set actual image directories found in your 'data' folder
# (Adjust these based on the actual zip extraction names)
TRAIN_IMG_DIR = "data/train_images"
VAL_IMG_DIR = "data/train_images"
TEST_IMG_DIR = "data/test_images"

# Check if directories exist, if not, try common subfolders
if not os.path.exists(TRAIN_IMG_DIR):
    TRAIN_IMG_DIR = "data/images/train"
    TEST_IMG_DIR = "data/images/test"

train_ds = ECGDigitizationDataset(train_df, TRAIN_IMG_DIR, transform=transform)
train_loader = DataLoader(train_ds, batch_size=16, shuffle=True)

print("Starting training baseline...")
model.train()
for epoch in range(1): # Reduced epochs for baseline execution
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}")
    for imgs, targets in pbar:
        imgs, targets = imgs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        pbar.set_postfix(loss=loss.item())

torch.save(model.state_dict(), "ecg_baseline.pth")

# ==========================================
# 8. SUBMISSION PREPARATION
# ==========================================
# The submission expects: id (base_id_rowid_lead) and value.
# Lead II is 10s, others 2.5s.
leads = ["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"]

print("Generating predictions on test data...")
test_ds = ECGDigitizationDataset(test_meta, TEST_IMG_DIR, transform=transform, is_test=True)
test_loader = DataLoader(test_ds, batch_size=8, shuffle=False)

model.eval()
submission_data = []

with torch.no_grad():
    for imgs, img_ids in tqdm(test_loader):
        imgs = imgs.to(device)
        preds = model(imgs).cpu().numpy() # Shape: (B, 12, 100)

        for i in range(len(img_ids)):
            base_id = img_ids[i]
            for l_idx, lead_name in enumerate(leads):
                # For baseline, we take 100 predicted points
                # and map them to the required indices
                signal_points = preds[i, l_idx, :]

                # In competition, Lead II has 10*fs points, others 2.5*fs
                # We simply map our 100 points for the sample CSV
                for row_id, val in enumerate(signal_points):
                    submission_data.append({
                        "id": f"{base_id}_{row_id}_{lead_name}",
                        "target": val
                    })

submission_df = pd.DataFrame(submission_data)
submission_df.to_csv("submission.csv", index=False)

print("\nSubmission file preview:")
print(submission_df.head())
print(f"\nFinal submission saved to submission.csv. Total rows: {len(submission_df)}")

# ==========================================
# 9. FINAL NOTES
# ==========================================
# 1. This is a structural baseline.
# 2. To improve: Load real ground truth signals using 'wfdb' or from provided CSVs.
# 3. Use actual 'fs' (sampling frequency) to determine the number of points per lead.
# 4. Use more complex backbones (ResNet50, EfficientNet) and longer sequences.

Using device: cuda
Please upload your kaggle.json file:


Saving kaggle.json to kaggle.json
https://www.kaggle.com/competitions/physionet-ecg-image-digitization                2026-01-22 23:59:00  Research            50,000 Usd        823            True  
Downloading dataset for physionet-ecg-image-digitization...
^C
Extracting files...


FileNotFoundError: Download failed: physionet-ecg-image-digitization.zip not found after kaggle download command.