# PTB-XL 12-Lead ECG Dataset Download

This notebook downloads the PTB-XL dataset from PhysioNet for ECG analysis.

## Dataset Information
- **Records**: 21,837 ECG recordings (10 seconds each)
- **Patients**: 18,885 unique patients
- **Leads**: Standard 12-lead ECG (I, II, III, aVR, aVL, aVF, V1-V6)
- **Sampling Rates**: 100 Hz and 500 Hz available
- **Size**: ~8GB total (100Hz: ~2GB, 500Hz: ~4GB)
- **Format**: WFDB format with CSV metadata

## Requirements
1. Google Colab environment
2. Google Drive for persistent storage (recommended)
3. PhysioNet account (free registration at https://physionet.org/)

## License
PTB-XL is available under the Open Data Commons Attribution License v1.0

## 🚀 Setup and Installation

In [None]:
# Mount Google Drive for persistent storage
from google.colab import drive
import os

drive.mount('/content/drive')
print("✅ Google Drive mounted successfully")

# Set up data directory
DATA_DIR = "/content/drive/MyDrive/ECG-LLM-Data"
os.makedirs(DATA_DIR, exist_ok=True)
print(f"📁 Data directory: {DATA_DIR}")

In [None]:
# Install required packages
!pip install --upgrade wfdb scipy matplotlib seaborn tqdm requests

print("✅ All packages installed successfully")

## 📥 Download PTB-XL Dataset

In [None]:
import os
import sys
import requests
import zipfile
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

# Set up paths
PTB_XL_DIR = Path(DATA_DIR) / "ptb_xl"
PTB_XL_DIR.mkdir(parents=True, exist_ok=True)

print(f"📍 PTB-XL will be stored at: {PTB_XL_DIR}")

In [None]:
def download_file(url, filepath, desc="Downloading"):
    """Download file with progress bar"""
    if filepath.exists():
        print(f"✅ {filepath.name} already exists, skipping")
        return True
    
    try:
        print(f"⬇️ Downloading {filepath.name}...")
        response = requests.get(url, stream=True)
        response.raise_for_status()
        
        total_size = int(response.headers.get('content-length', 0))
        
        with open(filepath, 'wb') as file, tqdm(
            desc=desc,
            total=total_size,
            unit='B',
            unit_scale=True,
            unit_divisor=1024,
        ) as pbar:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
                pbar.update(len(chunk))
        
        print(f"✅ Downloaded {filepath.name}")
        return True
        
    except Exception as e:
        print(f"❌ Failed to download {filepath.name}: {str(e)}")
        return False

def extract_zip(zip_path, extract_to):
    """Extract zip file and remove original"""
    print(f"📦 Extracting {zip_path.name}...")
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            # Extract with progress
            members = zip_ref.infolist()
            for member in tqdm(members, desc="Extracting"):
                zip_ref.extract(member, extract_to)
        
        print(f"✅ Extracted to {extract_to}")
        
        # Remove zip to save space
        zip_path.unlink()
        print(f"🗑️ Removed {zip_path.name} to save space")
        return True
        
    except Exception as e:
        print(f"❌ Failed to extract {zip_path.name}: {str(e)}")
        return False

In [None]:
# Download metadata files (small, always needed)
print("📋 Downloading metadata files...")

metadata_urls = {
    "ptbxl_database.csv": "https://physionet.org/files/ptb-xl/1.0.3/ptbxl_database.csv",
    "scp_statements.csv": "https://physionet.org/files/ptb-xl/1.0.3/scp_statements.csv"
}

for filename, url in metadata_urls.items():
    download_file(url, PTB_XL_DIR / filename, f"Downloading {filename}")

print("✅ Metadata files downloaded")

In [None]:
# Download 100Hz ECG records (recommended for development - smaller size)
print("📊 Downloading 100Hz ECG records (~2GB)...")
print("⏱️ This may take 10-15 minutes depending on internet speed")

records_100_url = "https://physionet.org/files/ptb-xl/1.0.3/records100.zip"
records_100_zip = PTB_XL_DIR / "records100.zip"

if download_file(records_100_url, records_100_zip, "100Hz Records"):
    extract_zip(records_100_zip, PTB_XL_DIR)
    print("✅ 100Hz records ready")
else:
    print("❌ Failed to download 100Hz records")

In [None]:
# Optional: Download 500Hz ECG records (full resolution - larger size)
DOWNLOAD_500HZ = False  # Set to True if you need full resolution

if DOWNLOAD_500HZ:
    print("📊 Downloading 500Hz ECG records (~4GB)...")
    print("⏱️ This may take 20-30 minutes depending on internet speed")
    print("⚠️ Warning: This will use significant storage space")
    
    records_500_url = "https://physionet.org/files/ptb-xl/1.0.3/records500.zip"
    records_500_zip = PTB_XL_DIR / "records500.zip"
    
    if download_file(records_500_url, records_500_zip, "500Hz Records"):
        extract_zip(records_500_zip, PTB_XL_DIR)
        print("✅ 500Hz records ready")
    else:
        print("❌ Failed to download 500Hz records")
else:
    print("ℹ️ Skipping 500Hz records (set DOWNLOAD_500HZ = True to download)")

## 🔍 Verify Dataset

In [None]:
# Verify dataset integrity
print("🔍 Verifying dataset integrity...")

# Check database file
db_path = PTB_XL_DIR / "ptbxl_database.csv"
if db_path.exists():
    df = pd.read_csv(db_path)
    print(f"✅ Database file: {len(df):,} records found")
    print(f"   - Unique patients: {df['patient_id'].nunique():,}")
    print(f"   - Date range: {df['recording_date'].min()} to {df['recording_date'].max()}")
else:
    print("❌ Database file not found")

# Check SCP statements
scp_path = PTB_XL_DIR / "scp_statements.csv"
if scp_path.exists():
    scp_df = pd.read_csv(scp_path)
    print(f"✅ SCP statements: {len(scp_df):,} diagnostic codes")
else:
    print("❌ SCP statements file not found")

# Check records directories
records_100 = PTB_XL_DIR / "records100"
if records_100.exists():
    count_100 = len(list(records_100.rglob("*.dat")))
    print(f"✅ 100Hz ECG files: {count_100:,} .dat files")
    
    # Check folder structure
    folders_100 = [d.name for d in records_100.iterdir() if d.is_dir()]
    print(f"   - Folders: {sorted(folders_100)}")
else:
    print("❌ 100Hz records directory not found")

records_500 = PTB_XL_DIR / "records500"
if records_500.exists():
    count_500 = len(list(records_500.rglob("*.dat")))
    print(f"✅ 500Hz ECG files: {count_500:,} .dat files")
else:
    print("ℹ️ 500Hz records not downloaded (optional)")

print("\n🎉 Dataset verification completed!")

## 📊 Explore Dataset

In [None]:
# Load and explore the dataset
import wfdb
import numpy as np

# Load database
df = pd.read_csv(PTB_XL_DIR / "ptbxl_database.csv")
scp_df = pd.read_csv(PTB_XL_DIR / "scp_statements.csv")

print("📈 Dataset Overview:")
print(f"Total records: {len(df):,}")
print(f"Unique patients: {df['patient_id'].nunique():,}")
print(f"Age range: {df['age'].min():.0f} - {df['age'].max():.0f} years")
print(f"Gender distribution:")
print(df['sex'].value_counts())
print(f"\nSampling rates available: {df['fs_hz'].unique()}")
print(f"Record lengths: {df['length_s'].unique()} seconds")

In [None]:
# Visualize dataset distribution
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Age distribution
axes[0,0].hist(df['age'].dropna(), bins=30, alpha=0.7, color='skyblue')
axes[0,0].set_title('Age Distribution')
axes[0,0].set_xlabel('Age (years)')
axes[0,0].set_ylabel('Count')

# Gender distribution
gender_counts = df['sex'].value_counts()
axes[0,1].pie(gender_counts.values, labels=['Male', 'Female'], autopct='%1.1f%%', startangle=90)
axes[0,1].set_title('Gender Distribution')

# Recording dates over time
df['recording_date'] = pd.to_datetime(df['recording_date'])
df['year'] = df['recording_date'].dt.year
year_counts = df['year'].value_counts().sort_index()
axes[1,0].bar(year_counts.index, year_counts.values, alpha=0.7, color='lightgreen')
axes[1,0].set_title('Recordings by Year')
axes[1,0].set_xlabel('Year')
axes[1,0].set_ylabel('Count')
axes[1,0].tick_params(axis='x', rotation=45)

# Heart rate distribution (if available)
if 'heart_rate' in df.columns:
    axes[1,1].hist(df['heart_rate'].dropna(), bins=30, alpha=0.7, color='salmon')
    axes[1,1].set_title('Heart Rate Distribution')
    axes[1,1].set_xlabel('Heart Rate (bpm)')
    axes[1,1].set_ylabel('Count')
else:
    axes[1,1].text(0.5, 0.5, 'Heart Rate\nData Not Available', 
                   ha='center', va='center', transform=axes[1,1].transAxes, fontsize=14)
    axes[1,1].set_title('Heart Rate Distribution')

plt.tight_layout()
plt.show()

## 🔬 Load Sample ECG

In [None]:
# Load a sample ECG record
print("📡 Loading sample ECG record...")

# Get first record path
sample_record = df.iloc[0]
record_path = str(PTB_XL_DIR / "records100" / sample_record['filename_lr'])

print(f"Loading: {sample_record['filename_lr']}")
print(f"Patient: {sample_record['patient_id']}, Age: {sample_record['age']}, Sex: {sample_record['sex']}")

try:
    # Load ECG signal
    signal, fields = wfdb.rdsamp(record_path)
    
    print(f"✅ Signal loaded successfully")
    print(f"   - Shape: {signal.shape} (samples × leads)")
    print(f"   - Sampling rate: {fields['fs']} Hz")
    print(f"   - Duration: {signal.shape[0] / fields['fs']:.1f} seconds")
    print(f"   - Lead names: {fields['sig_name']}")
    
    # Plot the 12-lead ECG
    fig, axes = plt.subplots(4, 3, figsize=(15, 12))
    axes = axes.flatten()
    
    time_axis = np.arange(signal.shape[0]) / fields['fs']
    
    for i, lead_name in enumerate(fields['sig_name']):
        axes[i].plot(time_axis, signal[:, i], linewidth=0.8)
        axes[i].set_title(f'Lead {lead_name}')
        axes[i].set_xlabel('Time (s)')
        axes[i].set_ylabel('Amplitude (mV)')
        axes[i].grid(True, alpha=0.3)
    
    plt.suptitle(f'12-Lead ECG - Patient {sample_record["patient_id"]}', fontsize=16)
    plt.tight_layout()
    plt.show()
    
except Exception as e:
    print(f"❌ Failed to load ECG: {str(e)}")
    print("   Make sure the records100 directory was downloaded and extracted correctly")

## 📋 Summary and Next Steps

In [None]:
print("🎉 PTB-XL Dataset Download Complete!")
print("=" * 50)
print(f"📁 Dataset location: {PTB_XL_DIR}")
print(f"💾 Total size: ~{sum(f.stat().st_size for f in PTB_XL_DIR.rglob('*') if f.is_file()) / (1024**3):.2f} GB")
print()
print("📊 Available files:")
print(f"   ✅ ptbxl_database.csv - {len(df):,} ECG records metadata")
print(f"   ✅ scp_statements.csv - {len(scp_df):,} diagnostic codes")
if (PTB_XL_DIR / "records100").exists():
    count_100 = len(list((PTB_XL_DIR / "records100").rglob("*.dat")))
    print(f"   ✅ records100/ - {count_100:,} ECG files @ 100Hz")
if (PTB_XL_DIR / "records500").exists():
    count_500 = len(list((PTB_XL_DIR / "records500").rglob("*.dat")))
    print(f"   ✅ records500/ - {count_500:,} ECG files @ 500Hz")
print()
print("🚀 Next Steps:")
print("1. Use this dataset in your ECG analysis pipeline")
print("2. Load ECG records with: wfdb.rdsamp(record_path)")
print("3. Access metadata with: pd.read_csv('ptbxl_database.csv')")
print("4. Implement signal processing and feature extraction")
print("5. Train ML models for ECG classification")
print()
print("📖 Useful resources:")
print("- PTB-XL Paper: https://www.nature.com/articles/s41597-020-0495-6")
print("- WFDB Documentation: https://wfdb.readthedocs.io/")
print("- PhysioNet: https://physionet.org/content/ptb-xl/")