# vitalDSP Large File Processing Tutorial

**Version:** 1.0  
**Date:** October 17, 2025  
**Phase:** 4 (Optimization & Testing)

This notebook demonstrates how to process large physiological signal files using vitalDSP's Phase 1-3 implementation.

## What You'll Learn

1. Loading OUCRU CSV files (small and large)
2. Quality screening with parallel processing
3. Multi-path signal processing
4. Performance monitoring and optimization
5. Complete end-to-end workflows

## Setup

In [None]:
# Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import time
from pathlib import Path

# vitalDSP imports
from vitalDSP.utils.data_processing.data_loader import (
    DataLoader,
    DataFormat,
    load_oucru_csv
)
from vitalDSP.utils.core_infrastructure.processing_pipeline import (
    StandardProcessingPipeline,
    OptimizedStandardProcessingPipeline
)
from vitalDSP.utils.core_infrastructure.quality_screener import (
    SignalQualityScreener,
    QualityScreeningConfig
)

# Configure matplotlib
%matplotlib inline
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)

print("âœ… Setup complete!")

## Part 1: Creating Test Data

First, let's create sample OUCRU CSV files for demonstration.

In [None]:
def create_oucru_test_file(filepath, duration_sec=60, fs=250, signal_type='ecg'):
    """
    Create a test OUCRU CSV file with realistic signal.
    
    Args:
        filepath: Output file path
        duration_sec: Duration in seconds
        fs: Sampling rate (Hz)
        signal_type: 'ecg' or 'ppg'
    """
    print(f"Creating test file: {filepath}")
    print(f"  Duration: {duration_sec}s ({duration_sec/60:.1f} min)")
    print(f"  Sampling rate: {fs} Hz")
    
    test_data = []
    
    for i in range(duration_sec):
        # Generate realistic signal
        t = np.arange(fs) / fs
        
        if signal_type == 'ecg':
            # Simplified ECG: dominant frequency ~1.2 Hz (72 bpm)
            signal = (
                np.sin(2 * np.pi * 1.2 * t) * 0.8 +
                np.sin(2 * np.pi * 2.4 * t) * 0.2 +
                np.random.randn(fs) * 0.1
            )
        else:  # ppg
            # Simplified PPG: dominant frequency ~1.0 Hz (60 bpm)
            signal = (
                np.sin(2 * np.pi * 1.0 * t) * 0.6 +
                np.sin(2 * np.pi * 2.0 * t) * 0.3 +
                np.random.randn(fs) * 0.05
            )
        
        test_data.append({
            'timestamp': f'2024-01-01 00:{i//60:02d}:{i%60:02d}',
            'signal': json.dumps(signal.tolist()),
            'sampling_rate': fs
        })
    
    # Save to CSV
    df = pd.DataFrame(test_data)
    df.to_csv(filepath, index=False)
    
    file_size_mb = filepath.stat().st_size / (1024 * 1024)
    print(f"  File size: {file_size_mb:.1f} MB")
    print(f"  Total samples: {duration_sec * fs:,}")
    print("âœ… File created\n")

# Create test files
test_dir = Path('test_data')
test_dir.mkdir(exist_ok=True)

# Small file (1 minute, ~5 MB)
small_file = test_dir / 'ecg_1min_250hz.csv'
# create_oucru_test_file(small_file, duration_sec=60, fs=250, signal_type='ecg')

# Medium file (5 minutes, ~25 MB) -> test large file 24hrs
medium_file = test_dir / 'ecg_24hrs_250hz.csv'
# create_oucru_test_file(medium_file, duration_sec=300*24*12, fs=250, signal_type='ecg')

# PPG file (2 minutes, ~10 MB)
ppg_file = test_dir / 'ppg_2min_100hz.csv'
# create_oucru_test_file(ppg_file, duration_sec=120, fs=100, signal_type='ppg')

## Part 2: Loading Data

### 2.1 Simple Loading (Small Files)

In [None]:
# Method 1: Using convenience function
print("Loading with convenience function...")
start_time = time.time()

signal, metadata = load_oucru_csv(small_file)

load_time = time.time() - start_time

print(f"\nðŸ“Š Loading Results:")
print(f"  Time: {load_time:.2f}s")
print(f"  Samples: {len(signal):,}")
print(f"  Sampling rate: {metadata['sampling_rate']} Hz")
print(f"  Duration: {metadata['duration_seconds']:.1f}s")
print(f"  Format: {metadata['format']}")

# Visualize
plt.figure(figsize=(12, 4))
plt.plot(signal[:1000], linewidth=0.5)
plt.title('First 1000 Samples of ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Method 2: Using DataLoader class (more control)
print("Loading with DataLoader...")
start_time = time.time()

loader = DataLoader(small_file, format=DataFormat.OUCRU_CSV)
data = loader.load(time_column='timestamp')

load_time = time.time() - start_time

print(f"\nðŸ“Š Loading Results:")
print(f"  Time: {load_time:.2f}s")
print(f"  DataFrame shape: {data.shape}")
print(f"  Columns: {list(data.columns)}")
print(f"  Metadata keys: {list(loader.metadata.keys())}")

# Show first few rows
print("\nFirst 5 rows:")
display(data.head())

### 2.2 Loading Larger Files (Automatic Streaming)

In [None]:
# Load medium file (streaming will be automatic if >100MB)
print("Loading medium file...")
start_time = time.time()

loader = DataLoader(medium_file, format=DataFormat.OUCRU_CSV)
data = loader.load(time_column='timestamp')  # Automatic optimization

load_time = time.time() - start_time

signal = data['signal'].values
fs = loader.sampling_rate

print(f"\nðŸ“Š Loading Results:")
print(f"  Time: {load_time:.2f}s")
print(f"  Format used: {loader.metadata['format']}")
print(f"  Samples: {len(signal):,}")
print(f"  Duration: {len(signal)/fs:.1f}s ({len(signal)/fs/60:.1f} min)")
print(f"  Memory efficient: {'Yes' if 'streaming' in loader.metadata['format'] else 'No'}")

# Plot entire signal
plt.figure(figsize=(14, 5))
plt.plot(signal[::10], linewidth=0.3, alpha=0.7)  # Decimate for visualization
plt.title(f'5-Minute ECG Signal ({len(signal):,} samples @ {fs} Hz)')
plt.xlabel('Sample (decimated 10x)')
plt.ylabel('Amplitude')
plt.grid(True, alpha=0.3)
plt.show()

### 2.3 Custom Chunk Size (Advanced)

In [None]:
# Force streaming with custom chunk size
print("Loading with custom chunk size...")

loader = DataLoader(medium_file, format=DataFormat.OUCRU_CSV)
data = loader.load(time_column='timestamp',chunk_size=5000)  # 5000 rows per chunk

print(f"\nðŸ“Š Streaming Details:")
print(f"  Format: {loader.metadata['format']}")
print(f"  Chunk size: {loader.metadata.get('chunk_size', 'N/A')} rows")
print(f"  Total rows: {loader.metadata['n_rows']}")
print(f"  Chunks processed: ~{loader.metadata['n_rows'] // loader.metadata.get('chunk_size', 1)}")

## Part 3: Quality Screening

### 3.1 Basic Quality Screening

In [None]:
# Load signal for screening
signal, metadata = load_oucru_csv(small_file)
fs = metadata['sampling_rate']

# Create screener with default config
screener = SignalQualityScreener()

# Screen signal
print("Screening signal quality...")
start_time = time.time()

result = screener.sampling_rate = fs
screener.signal_type = 'ecg'
results = screener.screen_signal(signal)
passed = sum(1 for r in results if r.passed_screening)
pass_rate = passed / len(results) if results else 0.0

screening_time = time.time() - start_time

print(f"\nðŸ“Š Quality Screening Results:")
print(f"  Time: {screening_time:.2f}s")
print(f"  Total segments: {len(results)}")
print(f"  Passed: {sum(1 for s in results if s.passed_screening)}")
print(f"  Failed: {sum(1 for s in results if not s.passed_screening)}")
print(f"  Pass rate: {pass_rate:.1%}")

# Visualize quality distribution
quality_scores = [s.quality_metrics.overall_quality for s in results]
snr_values = [s.quality_metrics.snr_db for s in results]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 4))

# Quality scores
ax1.hist(quality_scores, bins=20, edgecolor='black', alpha=0.7)
ax1.axvline(0.7, color='r', linestyle='--', label='Threshold (0.7)')
ax1.set_xlabel('Quality Score')
ax1.set_ylabel('Count')
ax1.set_title('Distribution of Quality Scores')
ax1.legend()
ax1.grid(True, alpha=0.3)

# SNR values
ax2.hist(snr_values, bins=20, edgecolor='black', alpha=0.7, color='green')
ax2.axvline(10.0, color='r', linestyle='--', label='Threshold (10 dB)')
ax2.set_xlabel('SNR (dB)')
ax2.set_ylabel('Count')
ax2.set_title('Distribution of SNR Values')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

### 3.2 Parallel Quality Screening

In [None]:
# Compare sequential vs parallel screening

# Sequential
config_seq = QualityScreeningConfig(max_workers=1)
screener_seq = SignalQualityScreener(config_seq)

print("Sequential screening...")
start = time.time()
result_seq = screener_seq.sampling_rate = fs
screener_seq.signal_type = 'ecg'
results = screener_seq.screen_signal(signal)
passed = sum(1 for r in results if r.passed_screening)
pass_rate = passed / len(results) if results else 0.0
time_seq = time.time() - start

# Parallel (4 workers)
config_par = QualityScreeningConfig(max_workers=4)
screener_par = SignalQualityScreener(config_par)

print("Parallel screening (4 workers)...")
start = time.time()
result_par = screener_par.sampling_rate = fs
screener_par.signal_type = 'ecg'
results = screener_par.screen_signal(signal)
passed = sum(1 for r in results if r.passed_screening)
pass_rate = passed / len(results) if results else 0.0
time_par = time.time() - start

# Compare
print(f"\nðŸ“Š Performance Comparison:")
print(f"  Sequential: {time_seq:.2f}s")
print(f"  Parallel (4 workers): {time_par:.2f}s")
print(f"  Speedup: {time_seq/time_par:.2f}x")
# print(f"  Pass rates match: {pass_rate_seq == pass_rate_par}")

### 3.3 Custom Quality Configuration

In [None]:
# Create custom configurationcustom_config = QualityScreeningConfig(    segment_duration=15.0,  # Longer segments    overlap=0.25,  # Less overlap    snr_threshold=8.0,  # Less strict SNR    min_quality_score=0.6,  # Less strict quality    max_workers=4,    conservative=False)screener_custom = SignalQualityScreener(custom_config)result_custom = screener_custom.sampling_rate = fsscreener_custom.signal_type = 'ecg'results = screener_custom.screen_signal(signal)passed = sum(1 for r in results if r.passed_screening)pass_rate = passed / len(results) if results else 0.0# Compare with defaultprint("Configuration Comparison:")print(f"\nDefault Config:")print(f"  Segments: {len(results)}")print(f"  Pass rate: {pass_rate:.1%}")print(f"\nCustom Config (less strict):")print(f"  Segments: {len(result_custom.segment_results)}")print(f"  Pass rate: {pass_rate_custom:.1%}")print(f"\n  â†’ {len(result_custom.segment_results) - len(results)} fewer segments (longer duration)")print(f"  â†’ {pass_rate_custom - pass_rate:.1%} higher pass rate (less strict)")

## Part 4: Signal Processing Pipeline

### 4.1 Basic Processing

In [None]:
# Create pipeline
pipeline = StandardProcessingPipeline()

# Process signal
print("Processing signal through 8-stage pipeline...")
start_time = time.time()

processing_result = pipeline.process(signal, fs, 'ecg')

processing_time = time.time() - start_time

print(f"\nðŸ“Š Processing Results:")
print(f"  Time: {processing_time:.2f}s")
print(f"  Success: {processing_result.success}")

if processing_result.success:
    print(f"  Selected path: {processing_result.metadata.get('selected_path')}")
    print(f"  Processing quality: {processing_result.metadata.get('processing_quality')}")
    print(f"  Stages completed: {len(processing_result.metadata.get('stages_completed', []))}")
    
    # Compare input vs output
    processed_signal = processing_result.signal
    
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8))
    
    # Original
    ax1.plot(signal[:5000], linewidth=0.5, alpha=0.7)
    ax1.set_title('Original Signal (first 5000 samples)')
    ax1.set_ylabel('Amplitude')
    ax1.grid(True, alpha=0.3)
    
    # Processed
    ax2.plot(processed_signal[:5000], linewidth=0.5, alpha=0.7, color='orange')
    ax2.set_title('Processed Signal (first 5000 samples)')
    ax2.set_xlabel('Sample')
    ax2.set_ylabel('Amplitude')
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
else:
    print(f"  Error: {processing_result.error_message}")

### 4.2 Optimized Pipeline (for Large Files)

In [None]:
# Compare Standard vs Optimized pipeline

# Standard
pipeline_std = StandardProcessingPipeline()
start = time.time()
result_std = pipeline_std.process(signal, fs, 'ecg')
time_std = time.time() - start

# Optimized
pipeline_opt = OptimizedStandardProcessingPipeline()
start = time.time()
result_opt = pipeline_opt.process(signal, fs, 'ecg')
time_opt = time.time() - start

# Compare
print("Pipeline Comparison:")
print(f"\nStandard Pipeline:")
print(f"  Time: {time_std:.2f}s")
print(f"  Success: {result_std.success}")

print(f"\nOptimized Pipeline:")
print(f"  Time: {time_opt:.2f}s")
print(f"  Success: {result_opt.success}")
print(f"  Speedup: {time_std/time_opt:.2f}x faster")

print(f"\nðŸ’¡ Recommendation: Use Optimized Pipeline for files >5 minutes")

## Part 5: Complete End-to-End Workflow

In [None]:
def complete_workflow(filepath, signal_type='ecg'):
    """
    Complete end-to-end workflow:
    1. Load data
    2. Screen quality
    3. Process signal
    4. Return results
    """
    print(f"\n{'='*60}")
    print(f"Processing: {filepath.name}")
    print(f"{'='*60}")
    
    # 1. Load
    print("\n[1/3] Loading data...")
    start = time.time()
    
    loader = DataLoader(filepath, format=DataFormat.OUCRU_CSV)
    data = loader.load()
    signal = data['signal'].values
    fs = loader.sampling_rate
    
    load_time = time.time() - start
    print(f"  âœ… Loaded {len(signal):,} samples @ {fs} Hz in {load_time:.2f}s")
    print(f"  Format: {loader.metadata['format']}")
    
    # 2. Screen Quality
    print("\n[2/3] Screening quality...")
    start = time.time()
    
    config = QualityScreeningConfig(max_workers=4)
    screener = SignalQualityScreener(config)
    quality_result = screener.sampling_rate = fs
screener.signal_type = signal_type
results = screener.screen_signal(signal)
passed = sum(1 for r in results if r.passed_screening)
pass_rate = passed / len(results) if results else 0.0
    
    screen_time = time.time() - start
    print(f"  âœ… Screened {len(quality_results)} segments in {screen_time:.2f}s")
    print(f"  Pass rate: {quality_pass_rate:.1%}")
    
    # 3. Process
    print("\n[3/3] Processing signal...")
    start = time.time()
    
    pipeline = OptimizedStandardProcessingPipeline()
    processing_result = pipeline.process(signal, fs, signal_type)
    
    process_time = time.time() - start
    print(f"  âœ… Processed in {process_time:.2f}s")
    print(f"  Success: {processing_result.success}")
    
    if processing_result.success:
        print(f"  Path: {processing_result.metadata.get('selected_path')}")
        print(f"  Quality: {processing_result.metadata.get('processing_quality')}")
    
    # Summary
    total_time = load_time + screen_time + process_time
    print(f"\n{'â”€'*60}")
    print(f"Total time: {total_time:.2f}s")
    print(f"  Load:    {load_time:.2f}s ({load_time/total_time*100:.1f}%)")
    print(f"  Screen:  {screen_time:.2f}s ({screen_time/total_time*100:.1f}%)")
    print(f"  Process: {process_time:.2f}s ({process_time/total_time*100:.1f}%)")
    print(f"{'â”€'*60}\n")
    
    return {
        'signal': processing_result.signal if processing_result.success else signal,
        'quality_result': quality_result,
        'processing_result': processing_result,
        'timing': {
            'load': load_time,
            'screen': screen_time,
            'process': process_time,
            'total': total_time
        }
    }

# Run workflow on medium file
result = complete_workflow(medium_file, signal_type='ecg')

## Part 6: Performance Monitoring

In [None]:
import psutil
import os

class PerformanceMonitor:
    """Simple performance monitor."""
    
    def __init__(self):
        self.process = psutil.Process(os.getpid())
        self.checkpoints = []
    
    def checkpoint(self, label):
        mem_mb = self.process.memory_info().rss / (1024**2)
        self.checkpoints.append((label, mem_mb))
        print(f"[{label}] Memory: {mem_mb:.1f} MB")
    
    def plot(self):
        labels = [c[0] for c in self.checkpoints]
        memory = [c[1] for c in self.checkpoints]
        
        plt.figure(figsize=(12, 5))
        plt.plot(memory, marker='o', markersize=8, linewidth=2)
        plt.xticks(range(len(labels)), labels, rotation=45, ha='right')
        plt.ylabel('Memory (MB)')
        plt.title('Memory Usage Throughout Workflow')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

# Monitor memory during workflow
monitor = PerformanceMonitor()

monitor.checkpoint('Start')

# Load
loader = DataLoader(medium_file, format=DataFormat.OUCRU_CSV)
data = loader.load()
monitor.checkpoint('After Load')

signal = data['signal'].values
fs = loader.sampling_rate
monitor.checkpoint('After Extract')

# Screen
config = QualityScreeningConfig(max_workers=4)
screener = SignalQualityScreener(config)
quality_result = screener.sampling_rate = fs
screener.signal_type = 'ecg'
results = screener.screen_signal(signal)
monitor.checkpoint('After Screen')

# Process
pipeline = OptimizedStandardProcessingPipeline()
result = pipeline.process(signal, fs, 'ecg')
monitor.checkpoint('After Process')

# Cleanup
del data, signal
import gc
gc.collect()
monitor.checkpoint('After Cleanup')

# Plot
monitor.plot()

## Summary

This notebook demonstrated:

1. âœ… **Data Loading**: Standard and streaming approaches
2. âœ… **Quality Screening**: Sequential and parallel processing
3. âœ… **Signal Processing**: 8-stage pipeline with multi-path processing
4. âœ… **Performance**: Monitoring and optimization
5. âœ… **Complete Workflow**: End-to-end processing pipeline

### Key Takeaways

- **Automatic optimization**: vitalDSP selects best loading strategy automatically
- **Parallel processing**: Use `max_workers=4` for 3-4x speedup
- **Memory efficiency**: Streaming reduces memory by 90% for large files
- **Flexible configuration**: Customize quality thresholds and processing parameters

### Next Steps

- See `LARGE_FILE_PROCESSING_GUIDE.md` for detailed documentation
- Check `PHASE1_3_API_REFERENCE.md` for complete API reference
- Review `PERFORMANCE_TUNING_GUIDE.md` for optimization strategies

---

*Tutorial completed successfully!*