# Phase 0‚Äì5A: SSL Data Preparation & Windowing

**Goal**: Prepare datasets for self-supervised learning (SSL) pretraining on 4,417 unlabeled PPG signals, then convert to Phase 5A windowed format (617K √ó 1,250 samples).

**Findings from Phase -1**:
- ‚ùå Zero overlap between waveform subject IDs (52-4833) and MIMIC clinical CSVs (10001-44228)
- 4,417 PPG segments available (75K samples @ 125 Hz each)
- 130 unique subjects, all "Excellent" quality (mean SQI=0.958)
- No clinical labels available ‚Üí use self-supervised pretraining

**Approach**: 
- **Phase 0**: Create train/val/test splits and compute wavelet-denoised ground truth
- **Phase 5A**: Generate overlapping 10-sec (1,250-sample) windows from denoised signals via stride-500 sliding windows
- Train denoising autoencoder on 617K windowed training examples
- Validate on 617K windowed validation examples
- Preserve subject-level splits to prevent patient biometric leakage in Phase 8

**Outputs**:
- **Phase 0**: ssl_pretraining_data.parquet, ssl_validation_data.parquet, ssl_test_data.parquet, denoised_signal_index.json, denoised_signals/*.npy
- **Phase 5A**: mimic_windows.npy (617K √ó 1,250 array), mimic_windows_metadata.parquet (window-level metadata)

## Setup and Configuration


In [1]:
import os
import sys
import json
from pathlib import Path
import numpy as np
import pandas as pd
from datetime import datetime
import logging
from typing import Tuple, Dict, List

# Setup paths - use absolute path to ensure correct directory
NOTEBOOK_DIR = Path(__file__).parent if '__file__' in dir() else Path.cwd()
PROJECT_ROOT = Path(r"c:\Developments\cardiometabolic-risk-colab").resolve()
os.chdir(PROJECT_ROOT)
sys.path.insert(0, str(PROJECT_ROOT / "colab_src"))

# Directories (absolute paths)
DATA_DIR = PROJECT_ROOT / "data" / "processed"
OUTPUT_DIR = DATA_DIR
DENOISED_SIGNALS_DIR = DATA_DIR / "denoised_signals"

# Create output directories
DENOISED_SIGNALS_DIR.mkdir(parents=True, exist_ok=True)

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print("‚úÖ Setup complete")
print(f"   Project root: {PROJECT_ROOT}")
print(f"   Data dir: {DATA_DIR}")
print(f"   Denoised signals dir: {DENOISED_SIGNALS_DIR}")


‚úÖ Setup complete
   Project root: C:\Developments\cardiometabolic-risk-colab
   Data dir: C:\Developments\cardiometabolic-risk-colab\data\processed
   Denoised signals dir: C:\Developments\cardiometabolic-risk-colab\data\processed\denoised_signals


## Step 1: Load Sprint 1 Signal Data


In [2]:
# Load signal metadata
signal_metadata_path = DATA_DIR / "sprint1_metadata.parquet"
signal_metadata_df = pd.read_parquet(signal_metadata_path)

# Load signal waveforms (if available as numpy array)
signal_array_path = DATA_DIR / "sprint1_signals.npy"
if signal_array_path.exists():
    signals = np.load(signal_array_path)
    print(f"‚úÖ Loaded signal array: {signals.shape}")
else:
    signals = None
    print(f"‚ö†Ô∏è  Signal array not found. Will be loaded individually from batches.")

print(f"\n‚úÖ Signal metadata loaded")
print(f"   Rows: {len(signal_metadata_df)}")
print(f"   Columns: {list(signal_metadata_df.columns)}")
print(f"\n   Summary statistics:")
print(f"   - Subjects: {signal_metadata_df['subject_id'].nunique()}")
print(f"   - Mean SQI: {signal_metadata_df['sqi_score'].mean():.3f}")
print(f"   - Mean SNR (dB): {signal_metadata_df['snr_db'].mean():.2f}")
print(f"\n   Sample rows:")
print(signal_metadata_df.head(3))


‚úÖ Loaded signal array: (4417, 75000)

‚úÖ Signal metadata loaded
   Rows: 4417
   Columns: ['record_name', 'subject_id', 'segment_idx', 'fs', 'sqi_score', 'quality_grade', 'snr_db', 'perfusion_index', 'channel_name', 'global_segment_idx', 'batch_num']

   Summary statistics:
   - Subjects: 130
   - Mean SQI: 0.958
   - Mean SNR (dB): 40.66

   Sample rows:
                record_name subject_id  segment_idx   fs  sqi_score  \
0  p00/p000052/3533390_0004    p000052            0  125   0.893482   
1  p00/p000052/3533390_0004    p000052            1  125   0.888996   
2  p00/p000052/3238451_0005    p000052            0  125   0.888845   

  quality_grade     snr_db  perfusion_index channel_name  global_segment_idx  \
0     Excellent  39.255102     3.938813e+06        PLETH                   0   
1     Excellent  38.742355     3.663113e+06        PLETH                   1   
2     Excellent  38.725109     1.637093e+06        PLETH                   2   

   batch_num  
0        1.0  
1  

## Step 2: Create Train/Val/Test Splits


In [3]:
from sklearn.model_selection import train_test_split

# PHASE 5A: SSL Pretraining - Train/Val only (no test set)
# Rationale: For SSL reconstruction, we don't need a test set since no clinical claims are made.
# Phase 8 transfer learning will use VitalDB's own test set for clinical validation.
# This maximizes training data for the denoising autoencoder.
np.random.seed(42)

total_segments = len(signal_metadata_df)
print(f"üìä Creating subject-level Train/Val splits from {total_segments} segments\n")
print(f"   Note: No test set needed for SSL pretraining (clinical testing in Phase 8 uses VitalDB)\n")

# Get unique subjects and their quality metrics
subject_quality = signal_metadata_df.groupby('subject_id').agg({
    'sqi_score': 'mean',
    'snr_db': 'mean'
}).reset_index()

subject_quality = subject_quality.sort_values('sqi_score', ascending=False).reset_index(drop=True)
print(f"   Unique subjects: {len(subject_quality)}")

# Split subjects (not segments) - 90% train / 10% val for SSL
n_subjects = len(subject_quality)
n_val_subjects = max(2, int(0.10 * n_subjects))   # ~10% of subjects for validation
n_train_subjects = n_subjects - n_val_subjects

print(f"   Target split: {n_train_subjects} train, {n_val_subjects} val subjects (90/10)")

# Take highest quality subjects for validation (best examples for monitoring)
val_subjects = subject_quality.iloc[:n_val_subjects]['subject_id'].values
train_subjects = subject_quality.iloc[n_val_subjects:]['subject_id'].values

# Filter signal_metadata_df by subject groups
train_df = signal_metadata_df[signal_metadata_df['subject_id'].isin(train_subjects)].copy()
val_df = signal_metadata_df[signal_metadata_df['subject_id'].isin(val_subjects)].copy()
test_df = pd.DataFrame()  # Empty dataframe for compatibility with downstream code

print(f"\n‚úÖ Subject-level Train/Val split created:")
print(f"   Train: {len(train_df):5} segments from {len(train_subjects):3} subjects ({100*len(train_df)/total_segments:.1f}%)")
print(f"   Val:   {len(val_df):5} segments from {len(val_subjects):3} subjects ({100*len(val_df)/total_segments:.1f}%)")

# Quality metrics for each split
print(f"\nüìà Quality metrics by split:")
for split_name, split_df in [("Train", train_df), ("Val", val_df)]:
    print(f"\n   {split_name}:")
    print(f"      Mean SQI:  {split_df['sqi_score'].mean():.3f} ¬± {split_df['sqi_score'].std():.3f}")
    print(f"      Mean SNR:  {split_df['snr_db'].mean():.2f} ¬± {split_df['snr_db'].std():.2f} dB")
    print(f"      Subjects:  {split_df['subject_id'].nunique()}")

# Verify ZERO subject overlap
overlap_train_val = set(train_df['subject_id']) & set(val_df['subject_id'])
assert len(overlap_train_val) == 0, f"Train-val subject overlap: {overlap_train_val}"
print(f"\n‚úÖ ZERO subject overlap between train/val (perfect isolation for SSL)")


üìä Creating subject-level Train/Val splits from 4417 segments

   Note: No test set needed for SSL pretraining (clinical testing in Phase 8 uses VitalDB)

   Unique subjects: 130
   Target split: 117 train, 13 val subjects (90/10)

‚úÖ Subject-level Train/Val split created:
   Train:  4290 segments from 117 subjects (97.1%)
   Val:     127 segments from  13 subjects (2.9%)

üìà Quality metrics by split:

   Train:
      Mean SQI:  0.956 ¬± 0.053
      Mean SNR:  40.60 ¬± 3.93 dB
      Subjects:  117

   Val:
      Mean SQI:  1.000 ¬± 0.000
      Mean SNR:  42.72 ¬± 2.58 dB
      Subjects:  13

‚úÖ ZERO subject overlap between train/val (perfect isolation for SSL)


## Step 3: Compute Wavelet-Denoised Ground Truth


In [6]:
# Import signal processing modules
from signal_processing.denoising import WaveletDenoiser

# Initialize denoising processor
denoiser = WaveletDenoiser(wavelet='db4', level=5, threshold_method='soft')

print("üîÑ Computing wavelet-denoised ground truth for all segments...\n")

# Ensure denoised signals directory exists
DENOISED_SIGNALS_DIR.mkdir(parents=True, exist_ok=True)

# Track denoised signals and create index
denoised_index = {}
denoised_count = 0

# Process all segments
for idx, row in signal_metadata_df.iterrows():
    segment_id = int(row['global_segment_idx'])  # Ensure it's a python int, not numpy int
    record_name = row['record_name']
    
    # Get original signal (either from loaded array or load batch file)
    if signals is not None:
        signal = signals[idx]
    else:
        # Load from signal_batches if available
        batch_dir = DATA_DIR / "signal_batches"
        if batch_dir.exists():
            # Try to find the signal file
            batch_files = list(batch_dir.glob(f"batch_*.npy"))
            if batch_files:
                # For now, skip if can't find individual signal
                print(f"   ‚ö†Ô∏è  Signal file not found for idx {idx}, skipping")
                continue
    
    # Denoise using wavelet decomposition
    denoised_signal = denoiser.denoise(signal)
    
    # Save denoised signal with proper path handling
    filename = f"{segment_id:06d}.npy"
    denoised_path = DENOISED_SIGNALS_DIR / filename
    np.save(str(denoised_path), denoised_signal.astype(np.float32))
    
    # Track in index
    denoised_index[segment_id] = filename
    denoised_count += 1
    
    if (denoised_count + 1) % 500 == 0:
        print(f"   Processed {denoised_count}/{len(signal_metadata_df)} segments")

print(f"\n‚úÖ Wavelet denoising complete")
print(f"   Denoised signals: {denoised_count}")
print(f"   Saved to: {DENOISED_SIGNALS_DIR}")

# Save index as JSON for fast lookup
index_path = DATA_DIR / "denoised_signal_index.json"
with open(str(index_path), 'w') as f:
    json.dump(denoised_index, f, indent=2)
print(f"   Index saved to: {index_path}")

üîÑ Computing wavelet-denoised ground truth for all segments...

   Processed 499/4417 segments
   Processed 999/4417 segments
   Processed 1499/4417 segments
   Processed 1999/4417 segments
   Processed 2499/4417 segments
   Processed 2999/4417 segments
   Processed 3499/4417 segments
   Processed 3999/4417 segments

‚úÖ Wavelet denoising complete
   Denoised signals: 4417
   Saved to: C:\Developments\cardiometabolic-risk-colab\data\processed\denoised_signals
   Index saved to: C:\Developments\cardiometabolic-risk-colab\data\processed\denoised_signal_index.json


## Step 4: Save Data Splits as Parquet Files


In [4]:
# Add segment_id column for tracking
train_df['segment_id'] = train_df['global_segment_idx']
val_df['segment_id'] = val_df['global_segment_idx']

# Save parquet files (Train/Val only)
train_path = OUTPUT_DIR / "ssl_pretraining_data.parquet"
val_path = OUTPUT_DIR / "ssl_validation_data.parquet"

train_df.to_parquet(train_path)
val_df.to_parquet(val_path)

print("‚úÖ Data splits saved to parquet:")
print(f"   Train: {train_path}")
print(f"   Val:   {val_path}")

# Verify files
print(f"\nüìã Verification:")
print(f"   Train parquet size: {train_path.stat().st_size / 1024**2:.2f} MB")
print(f"   Val parquet size:   {val_path.stat().st_size / 1024**2:.2f} MB")


‚úÖ Data splits saved to parquet:
   Train: C:\Developments\cardiometabolic-risk-colab\data\processed\ssl_pretraining_data.parquet
   Val:   C:\Developments\cardiometabolic-risk-colab\data\processed\ssl_validation_data.parquet

üìã Verification:
   Train parquet size: 0.19 MB
   Val parquet size:   0.01 MB


## Phase 5A: Generate Windowed Data (617K √ó 1,250 samples)

In [10]:
from data_pipeline.generate_mimic_windows import MIMICWindowGenerator

print("="*80)
print("PHASE 5A: Generate Overlapping Windows from Denoised Signals")
print("="*80)

# Initialize window generator with batch-saving for memory efficiency
print("\nüì• Initializing window generator...")
denoised_index_path = DATA_DIR / "denoised_signal_index.json"
generator = MIMICWindowGenerator(
    signal_dir=DENOISED_SIGNALS_DIR,
    denoised_index_path=denoised_index_path,
    window_length=1250,  # 10 seconds @ 125 Hz
    stride=500           # 50% overlap
)

print(f"   Window length: {generator.window_length} samples (10 sec @ 125 Hz)")
print(f"   Stride: {generator.stride} samples (50% overlap)")
print(f"   Total signals indexed: {len(generator.signal_index)}")

# Generate windows with batch-saving
print("\nüîÑ Generating windows with memory-mapped batch saving...")
windows_path = OUTPUT_DIR / "mimic_windows.npy"
windows_meta_path = OUTPUT_DIR / "mimic_windows_metadata.parquet"

total_windows, total_kept = generator.generate_windows(
    output_array_path=windows_path,
    output_metadata_path=windows_meta_path,
    quality_metadata_path=DATA_DIR / "sprint1_metadata.parquet",
    batch_size=1000
)

# Load results for verification
print(f"\n‚úÖ Window generation complete!")
print(f"   Total windows generated: {total_windows:,}")
print(f"   Total windows kept: {total_kept:,}")

# Load and verify the output
windows_array = np.load(windows_path)
windows_metadata = pd.read_parquet(windows_meta_path)

print(f"\nüìä Generated data statistics:")
print(f"   Window array shape: {windows_array.shape}")
print(f"   Array dtype: {windows_array.dtype}")
print(f"   Array size: {windows_array.nbytes / 1024**3:.2f} GB")
print(f"   Metadata records: {len(windows_metadata)}")

# Verify output files exist
print(f"\nüìã Output files:")
if windows_path.exists():
    print(f"   ‚úÖ Windows array: {windows_path.stat().st_size / 1024**3:.2f} GB")
if windows_meta_path.exists():
    print(f"   ‚úÖ Metadata: {windows_meta_path.stat().st_size / 1024**2:.2f} MB")

2026-01-15 09:18:10,635 - INFO - Loaded index with 4417 signals
2026-01-15 09:18:10,728 - INFO - Loaded quality metadata with 4417 rows
2026-01-15 09:18:10,729 - INFO - Scanning signals to estimate output size...


PHASE 5A: Generate Overlapping Windows from Denoised Signals

üì• Initializing window generator...
   Window length: 1250 samples (10 sec @ 125 Hz)
   Stride: 500 samples (50% overlap)
   Total signals indexed: 4417

üîÑ Generating windows with memory-mapped batch saving...


2026-01-15 09:19:40,768 - INFO - Estimated total windows: 653,716
2026-01-15 09:19:40,769 - INFO - Creating memory-mapped output array: (653,716, 1250)
Generating windows:   0%|          | 0/4417 [00:00<?, ?it/s]2026-01-15 09:19:40,935 - INFO - Flushed 1,000 windows to disk...
Generating windows:   0%|          | 7/4417 [00:00<01:25, 51.29it/s]2026-01-15 09:19:41,068 - INFO - Flushed 2,000 windows to disk...
Generating windows:   0%|          | 14/4417 [00:00<01:25, 51.72it/s]2026-01-15 09:19:41,202 - INFO - Flushed 3,000 windows to disk...
Generating windows:   0%|          | 21/4417 [00:00<01:24, 51.94it/s]2026-01-15 09:19:41,336 - INFO - Flushed 4,000 windows to disk...
Generating windows:   1%|          | 28/4417 [00:00<01:24, 51.75it/s]2026-01-15 09:19:41,474 - INFO - Flushed 5,000 windows to disk...
Generating windows:   1%|          | 34/4417 [00:00<01:29, 49.02it/s]2026-01-15 09:19:41,606 - INFO - Flushed 6,000 windows to disk...
Generating windows:   1%|          | 41/4417 [00


‚úÖ Window generation complete!
   Total windows generated: 653,716
   Total windows kept: 653,716

üìä Generated data statistics:
   Window array shape: (653716, 1250)
   Array dtype: float32
   Array size: 3.04 GB
   Metadata records: 653716

üìã Output files:
   ‚úÖ Windows array: 3.04 GB
   ‚úÖ Metadata: 2.95 MB


In [7]:
print("\n" + "="*80)
print("PHASE 0 COMPLETION SUMMARY")
print("="*80)

print(f"\n‚úÖ DATA SPLITS CREATED (Train/Val only):")
print(f"   Training:   {len(train_df):5} segments ({100*len(train_df)/total_segments:.1f}%)")
print(f"   Validation: {len(val_df):5} segments ({100*len(val_df)/total_segments:.1f}%)")
print(f"   Test:       None (SSL pretraining only, no clinical claims)")

print(f"\n‚úÖ QUALITY ASSURANCE:")
print(f"   Total unique subjects: {len(signal_metadata_df['subject_id'].unique())}")
print(f"   Train unique subjects: {len(train_df['subject_id'].unique())}")
print(f"   Val unique subjects:   {len(val_df['subject_id'].unique())}")
print(f"   ‚úÖ Zero subject overlap between train/val")

print(f"\n‚úÖ GROUND TRUTH PREPARATION:")
print(f"   Wavelet denoised signals: {denoised_count}")
print(f"   Index file: {index_path}")
print(f"   Denoised dir: {DENOISED_SIGNALS_DIR}")

print(f"\n‚úÖ PHASE 0 OUTPUT FILES:")
print(f"   1. ssl_pretraining_data.parquet ({train_path.stat().st_size / 1024**2:.2f} MB)")
print(f"   2. ssl_validation_data.parquet ({val_path.stat().st_size / 1024**2:.2f} MB)")
print(f"   3. denoised_signal_index.json ({index_path.stat().st_size / 1024:.2f} KB)")
print(f"   4. denoised_signals/*.npy ({DENOISED_SIGNALS_DIR.stat().st_size / 1024**2:.2f} MB total)")

print(f"\n" + "="*80)
print("PHASE 0 COMPLETE ‚úÖ")
print("Proceeding to Phase 5A: Generate windowed data")
print("="*80)


PHASE 0 COMPLETION SUMMARY

‚úÖ DATA SPLITS CREATED (Train/Val only):
   Training:    4290 segments (97.1%)
   Validation:   127 segments (2.9%)
   Test:       None (SSL pretraining only, no clinical claims)

‚úÖ QUALITY ASSURANCE:
   Total unique subjects: 130
   Train unique subjects: 117
   Val unique subjects:   13
   ‚úÖ Zero subject overlap between train/val

‚úÖ GROUND TRUTH PREPARATION:
   Wavelet denoised signals: 4417
   Index file: C:\Developments\cardiometabolic-risk-colab\data\processed\denoised_signal_index.json
   Denoised dir: C:\Developments\cardiometabolic-risk-colab\data\processed\denoised_signals

‚úÖ PHASE 0 OUTPUT FILES:
   1. ssl_pretraining_data.parquet (0.19 MB)
   2. ssl_validation_data.parquet (0.01 MB)
   3. denoised_signal_index.json (106.76 KB)
   4. denoised_signals/*.npy (1.00 MB total)

PHASE 0 COMPLETE ‚úÖ
Proceeding to Phase 5A: Generate windowed data


In [11]:
# Verify split statistics using original train/val dataframes
# Windows metadata doesn't have split column - it's based on source signal's original split
print(f"\nüìä Window split distribution (from source signals):")
windows_per_split = {
    'train': 0,
    'val': 0,
}

train_signal_ids = set(train_df['global_segment_idx'].astype(int).values)
val_signal_ids = set(val_df['global_segment_idx'].astype(int).values)

print(f"   Train signal IDs: {len(train_signal_ids)} segments")
print(f"   Val signal IDs:   {len(val_signal_ids)} segments")

for source_signal_id in windows_metadata['source_signal_id'].unique():
    n_windows_from_signal = (windows_metadata['source_signal_id'] == source_signal_id).sum()
    
    # Convert to int for comparison (source_signal_id is stored as string in metadata)
    source_signal_id_int = int(source_signal_id)
    
    if source_signal_id_int in train_signal_ids:
        windows_per_split['train'] += n_windows_from_signal
    elif source_signal_id_int in val_signal_ids:
        windows_per_split['val'] += n_windows_from_signal

for split, count in windows_per_split.items():
    pct = 100 * count / len(windows_metadata) if len(windows_metadata) > 0 else 0
    print(f"   {split}: {count:,} windows ({pct:.1f}%)")

# Verify subject-level grouping
if 'subject_id' in windows_metadata.columns:
    train_subject_ids = set(train_df['subject_id'].unique())
    val_subject_ids = set(val_df['subject_id'].unique())
    
    # Filter windows by their source signal's split assignment
    train_windows = windows_metadata[windows_metadata['source_signal_id'].astype(int).isin(train_signal_ids)]
    val_windows = windows_metadata[windows_metadata['source_signal_id'].astype(int).isin(val_signal_ids)]
    
    train_subjects_in_windows = train_windows['subject_id'].nunique()
    val_subjects_in_windows = val_windows['subject_id'].nunique()
    
    print(f"\nüë• Subject-level integrity (prevents patient leakage):")
    print(f"   Train unique subjects in windows: {train_subjects_in_windows} (from {len(train_subject_ids)} subjects)")
    print(f"   Val unique subjects in windows:   {val_subjects_in_windows} (from {len(val_subject_ids)} subjects)")
    
    # Check for overlap
    overlap_train_val = train_subject_ids & val_subject_ids
    
    if len(overlap_train_val) == 0:
        print(f"   ‚úÖ No subject overlap between train/val")


üìä Window split distribution (from source signals):
   Train signal IDs: 4290 segments
   Val signal IDs:   127 segments
   train: 634,920 windows (97.1%)
   val: 18,796 windows (2.9%)

üë• Subject-level integrity (prevents patient leakage):
   Train unique subjects in windows: 4290 (from 117 subjects)
   Val unique subjects in windows:   127 (from 13 subjects)
   ‚úÖ No subject overlap between train/val


In [12]:
# Recalculate subject counts from split dataframes and window metadata
train_signal_ids = set(train_df['global_segment_idx'].astype(int).values)
val_signal_ids = set(val_df['global_segment_idx'].astype(int).values)

train_windows = windows_metadata[windows_metadata['source_signal_id'].astype(int).isin(train_signal_ids)]
val_windows = windows_metadata[windows_metadata['source_signal_id'].astype(int).isin(val_signal_ids)]

train_subjects_in_windows = train_windows['subject_id'].nunique()
val_subjects_in_windows = val_windows['subject_id'].nunique()

print(f"   Train subjects: {train_subjects_in_windows} (from {len(train_df['subject_id'].unique())} subjects)")
print(f"   Val subjects: {val_subjects_in_windows} (from {len(val_df['subject_id'].unique())} subjects)")

# Recalculate split distribution
windows_per_split = {'train': 0, 'val': 0}
for source_signal_id in windows_metadata['source_signal_id'].unique():
    n_windows = (windows_metadata['source_signal_id'] == source_signal_id).sum()
    source_signal_id_int = int(source_signal_id)
    if source_signal_id_int in train_signal_ids:
        windows_per_split['train'] += n_windows
    elif source_signal_id_int in val_signal_ids:
        windows_per_split['val'] += n_windows

   Train subjects: 4290 (from 117 subjects)
   Val subjects: 127 (from 13 subjects)
