# SlipstreamLoader Benchmarks

This notebook benchmarks the high-performance loading pipeline over **full epochs**:

1. **Raw I/O**: Memory-mapped batch loading with `OptimizedCache`
2. **Comparison**: OptimizedCache vs StreamingDataLoader (the key speedup!)
3. **CPU Decode**: Parallel JPEG decode with TurboJPEG
4. **Full Pipeline**: `SlipstreamLoader` with decode + transforms
5. **Summary**: All results in one table

Each stage is benchmarked for multiple complete epochs (after warmup) to ensure consistent measurements.

## Performance Targets (from CLAUDE.md)

| Metric | FFCV | SlipstreamLoader | Notes |
|--------|------|------------------|-------|
| Raw I/O | ~350k img/s | **480k+ img/s** | +37% faster |
| GPU Decode + RRC | ~10k img/s | **10.1k img/s** | Equivalent |
| CPU Decode + RRC | N/A | ~5.7k img/s | TurboJPEG |
| Cold Start | baseline | **20% faster** | Parallel chunk downloads |

In [None]:
# Test dataset path (ImageNet validation, streaming format)
LITDATA_VAL_PATH = "s3://visionlab-datasets/imagenet1k/pre-processed/s256-l512-jpgbytes-q100-streaming/val/"

# Benchmark parameters
BATCH_SIZE = 256
NUM_EPOCHS = 3      # Number of full epochs to benchmark (results averaged)
NUM_WARMUP = 1      # Warmup epochs (not timed, important for page cache)

In [None]:
import time
import numpy as np
import torch
from tqdm import tqdm
from slipstream import SlipstreamDataset, list_collate_fn  

# First, ensure the dataset is cached locally
print("Loading dataset to ensure cache is populated...")
dataset = SlipstreamDataset(
    remote_dir=LITDATA_VAL_PATH,
    decode_images=False,
)
dataset

## 1. Benchmark Raw I/O (OptimizedCache)

This measures pure I/O throughput without decoding. The OptimizedCache uses:
- Memory-mapped contiguous file format for all fields
- Numba JIT-compiled batch loading with `nogil=True`
- Pre-allocated buffers to avoid per-batch allocation

In [None]:
from slipstream.cache import OptimizedCache

# Build/load optimized cache
print("Building/loading optimized cache...")
cache = OptimizedCache.build(dataset) if not OptimizedCache.exists(dataset.cache_path) else OptimizedCache.load(dataset.cache_path)
print(f"Cache ready: {len(cache):,} samples")
print(f"Fields: {list(cache.fields.keys())}")
cache

In [None]:
def benchmark_raw_io_epoch(cache, batch_size, field='image'):
    """Benchmark raw I/O throughput for one full epoch."""
    num_samples = len(cache)
    num_batches = (num_samples + batch_size - 1) // batch_size
    indices = np.arange(num_samples, dtype=np.int64)
    
    total_samples = 0
    start = time.perf_counter()
    
    for i in tqdm(range(num_batches), leave=False):
        batch_start = i * batch_size
        batch_end = min(batch_start + batch_size, num_samples)
        batch_indices = indices[batch_start:batch_end]
        
        # Load batch (image field)
        batch_data = cache.load_batch(batch_indices, fields=[field])
        total_samples += len(batch_indices)
    
    elapsed = time.perf_counter() - start
    samples_per_sec = total_samples / elapsed
    
    return {
        'samples_per_sec': samples_per_sec,
        'elapsed_sec': elapsed,
        'total_samples': total_samples,
    }

def benchmark_raw_io(cache, batch_size, num_epochs, num_warmup, field='image'):
    """Benchmark raw I/O over multiple epochs."""
    # Warmup epochs (show results)
    print(f"  Warmup ({num_warmup} epoch(s)):")
    for i in range(num_warmup):
        result = benchmark_raw_io_epoch(cache, batch_size, field)
        print(f"    Warmup {i + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    # Timed epochs
    results = []
    for epoch in range(num_epochs):
        result = benchmark_raw_io_epoch(cache, batch_size, field)
        results.append(result)
        print(f"  Epoch {epoch + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    # Average results
    avg_samples_per_sec = np.mean([r['samples_per_sec'] for r in results])
    total_samples = results[0]['total_samples']
    
    return {
        'samples_per_sec': avg_samples_per_sec,
        'total_samples': total_samples,
        'num_epochs': num_epochs,
        'per_epoch_results': results,
    }

print("Benchmarking raw I/O (full epochs)...\n")

# Image field (variable-size, main benchmark)
print("Image field (cache.load_batch):")
result_io = benchmark_raw_io(cache, BATCH_SIZE, NUM_EPOCHS, NUM_WARMUP, field='image')
print(f"  Average: {result_io['samples_per_sec']:,.0f} samples/sec\n")

# Label field (fixed-size, should be even faster)
print("Label field (cache.load_batch):")
result_labels = benchmark_raw_io(cache, BATCH_SIZE, NUM_EPOCHS, NUM_WARMUP, field='label')
print(f"  Average: {result_labels['samples_per_sec']:,.0f} samples/sec")

## 2. Compare: OptimizedCache vs StreamingDataLoader (Raw I/O)

This is the key comparison showing why OptimizedCache matters:
- **StreamingDataLoader**: Standard LitData iteration (sequential chunk reads)
- **OptimizedCache**: Memory-mapped batch loading (O(1) random access)

After warmup (epoch 1), the OptimizedCache benefits from OS page cache, making subsequent epochs dramatically faster.

In [None]:
from litdata import StreamingDataLoader
from slipstream import SlipstreamLoader

def benchmark_loader_epoch_raw(loader):
    """Benchmark loader for one full epoch (raw iteration)."""
    total_samples = 0
    start = time.perf_counter()
    
    for batch in tqdm(loader, leave=False):
        # Just count samples - raw data
        if isinstance(batch, dict):
            img_data = batch.get('image')
            if isinstance(img_data, dict):
                # Raw dict with 'data', 'sizes', etc.
                total_samples += len(img_data['data'])
            elif hasattr(img_data, '__len__'):
                total_samples += len(img_data)
            else:
                total_samples += batch.get('label', torch.tensor([0])).shape[0]
        else:
            total_samples += len(batch[0]) if isinstance(batch, (list, tuple)) else 1
    
    elapsed = time.perf_counter() - start
    samples_per_sec = total_samples / elapsed if elapsed > 0 else 0
    
    return {
        'samples_per_sec': samples_per_sec,
        'elapsed_sec': elapsed,
        'total_samples': total_samples,
    }

def benchmark_dataloader(loader, num_epochs, num_warmup, name="Loader"):
    """Benchmark any dataloader over multiple epochs."""
    # Warmup epochs (show results)
    print(f"  Warmup ({num_warmup} epoch(s)):")
    for i in range(num_warmup):
        result = benchmark_loader_epoch_raw(loader)
        print(f"    Warmup {i + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    # Timed epochs
    results = []
    for epoch in range(num_epochs):
        result = benchmark_loader_epoch_raw(loader)
        results.append(result)
        print(f"  Epoch {epoch + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    avg_samples_per_sec = np.mean([r['samples_per_sec'] for r in results])
    
    return {
        'samples_per_sec': avg_samples_per_sec,
        'total_samples': results[0]['total_samples'],
        'per_epoch_results': results,
    }

print("Benchmarking DataLoaders HEAD-TO-HEAD (raw I/O, no decode)...\n")

# 1. StreamingDataLoader (LitData)
streaming_loader = StreamingDataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=8,
    drop_last=False,
    collate_fn=list_collate_fn,  # Handle variable-sized image bytes
)

print("StreamingDataLoader (8 workers):")
result_streaming = benchmark_dataloader(streaming_loader, NUM_EPOCHS, NUM_WARMUP)
print(f"  Average: {result_streaming['samples_per_sec']:,.0f} samples/sec\n")

# 2. SlipstreamLoader with NO pipelines (raw bytes)
raw_loader = SlipstreamLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    drop_last=False,
    # No pipelines = raw data!
)

print("SlipstreamLoader (no pipelines, raw I/O):")
result_slipstream_raw = benchmark_dataloader(raw_loader, NUM_EPOCHS, NUM_WARMUP)
print(f"  Average: {result_slipstream_raw['samples_per_sec']:,.0f} samples/sec\n")

# Calculate speedup
speedup = result_slipstream_raw['samples_per_sec'] / result_streaming['samples_per_sec'] if result_streaming['samples_per_sec'] > 0 else float('inf')
print(f"--- Result ---")
print(f"SlipstreamLoader is {speedup:.1f}x faster than StreamingDataLoader for raw I/O")
print(f"This is because SlipstreamLoader uses memory-mapped OptimizedCache with OS page cache.")

In [None]:
# dataset[0]

## 3. Benchmark CPU Decode (TurboJPEG)

This measures decode throughput using TurboJPEG with:
- Thread-local decoder instances (no lock contention)
- DCT-space cropping (lossless, ~2x faster than post-decode crop)
- ThreadPoolExecutor parallelism

In [None]:
from slipstream.decoders import CPUDecoder

# Initialize decoder
cpu_decoder = CPUDecoder(num_workers=8)
print(f"CPU decoder: {cpu_decoder}")

In [None]:
def benchmark_cpu_decode_epoch(cache, decoder, batch_size, with_crop=False):
    """Benchmark CPU decode throughput for one full epoch."""
    num_samples = len(cache)
    num_batches = (num_samples + batch_size - 1) // batch_size
    indices = np.arange(num_samples, dtype=np.int64)
    
    total_samples = 0
    start = time.perf_counter()
    
    for i in tqdm(range(num_batches), leave=False):
        batch_start = i * batch_size
        batch_end = min(batch_start + batch_size, num_samples)
        batch_indices = indices[batch_start:batch_end]
        
        # Load raw data
        batch_data = cache.load_batch(batch_indices, fields=['image'])
        data = batch_data['image']['data']
        sizes = batch_data['image']['sizes']
        heights = batch_data['image']['heights']
        widths = batch_data['image']['widths']
        
        # Decode
        if with_crop:
            images = decoder.decode_batch_random_crop(
                data, sizes, heights, widths,
                target_size=224,
                scale=(0.08, 1.0),
            )
        else:
            images = decoder.decode_batch(data, sizes)
        
        total_samples += len(batch_indices)
    
    elapsed = time.perf_counter() - start
    samples_per_sec = total_samples / elapsed
    
    return {
        'samples_per_sec': samples_per_sec,
        'elapsed_sec': elapsed,
        'total_samples': total_samples,
    }

def benchmark_cpu_decode(cache, decoder, batch_size, num_epochs, num_warmup, with_crop=False):
    """Benchmark CPU decode over multiple epochs."""
    crop_str = "+ RRC" if with_crop else "(no crop)"
    
    # Warmup (show results)
    print(f"  Warmup ({num_warmup} epoch(s)):")
    for i in range(num_warmup):
        result = benchmark_cpu_decode_epoch(cache, decoder, batch_size, with_crop)
        print(f"    Warmup {i + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    # Timed epochs
    results = []
    for epoch in range(num_epochs):
        result = benchmark_cpu_decode_epoch(cache, decoder, batch_size, with_crop)
        results.append(result)
        print(f"  Epoch {epoch + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    avg_samples_per_sec = np.mean([r['samples_per_sec'] for r in results])
    
    return {
        'samples_per_sec': avg_samples_per_sec,
        'total_samples': results[0]['total_samples'],
        'with_crop': with_crop,
    }

print("Benchmarking CPU decode (full epochs)...\n")

# Simple decode (no crop)
print("Simple decode (no crop):")
result_decode = benchmark_cpu_decode(cache, cpu_decoder, BATCH_SIZE, NUM_EPOCHS, NUM_WARMUP, with_crop=False)
print(f"  Average: {result_decode['samples_per_sec']:,.0f} samples/sec\n")

# Decode with RandomResizedCrop
print("Decode + RandomResizedCrop:")
result_rrc = benchmark_cpu_decode(cache, cpu_decoder, BATCH_SIZE, NUM_EPOCHS, NUM_WARMUP, with_crop=True)
print(f"  Average: {result_rrc['samples_per_sec']:,.0f} samples/sec")

## 4. Benchmark SlipstreamLoader (Full Pipeline)

This benchmarks the complete training pipeline:
- Async I/O with prefetching
- CPU decode with RandomResizedCrop
- ImageNet normalization
- Output to GPU tensors

In [None]:
from slipstream import SlipstreamLoader, RandomResizedCrop, Normalize

# Create loader with pipelines - this auto-uses the optimized cache we already built
loader = SlipstreamLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,  # Disable shuffle for reproducible benchmarks
    pipelines={
        'image': [
            RandomResizedCrop(224, scale=(0.08, 1.0), device='cpu', num_workers=8),
            Normalize(),
        ],
    },
    exclude_fields=['path'],  # Skip loading file paths (not needed for training)
)
print(loader)

In [None]:
def benchmark_loader_epoch(loader):
    """Benchmark full pipeline throughput for one epoch."""
    total_samples = 0
    
    start = time.perf_counter()
    
    for batch in tqdm(loader, leave=False):
        total_samples += batch['image'].shape[0]
        # Simulate minimal work (ensure tensor is materialized)
        _ = batch['image'].sum()
    
    elapsed = time.perf_counter() - start
    samples_per_sec = total_samples / elapsed
    
    return {
        'samples_per_sec': samples_per_sec,
        'elapsed_sec': elapsed,
        'total_samples': total_samples,
    }

def benchmark_loader_epochs(loader, num_epochs, num_warmup, name="Loader"):
    """Benchmark loader over multiple epochs."""
    # Warmup (show results)
    print(f"  Warmup ({num_warmup} epoch(s)):")
    for i in range(num_warmup):
        result = benchmark_loader_epoch(loader)
        print(f"    Warmup {i + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    # Timed epochs
    results = []
    for epoch in range(num_epochs):
        result = benchmark_loader_epoch(loader)
        results.append(result)
        print(f"  Epoch {epoch + 1}: {result['samples_per_sec']:,.0f} samples/sec ({result['elapsed_sec']:.2f}s)")
    
    avg_samples_per_sec = np.mean([r['samples_per_sec'] for r in results])
    
    return {
        'samples_per_sec': avg_samples_per_sec,
        'total_samples': results[0]['total_samples'],
    }

print("Benchmarking SlipstreamLoader (full epochs)...\n")

print("SlipstreamLoader (train mode, RandomResizedCrop):")
result_loader = benchmark_loader_epochs(loader, NUM_EPOCHS, NUM_WARMUP)
print(f"  Average: {result_loader['samples_per_sec']:,.0f} samples/sec")

In [None]:
from slipstream import CenterCrop

# Also test validation mode with CenterCrop pipeline
val_loader = SlipstreamLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    pipelines={
        'image': [
            CenterCrop(224, device='cpu', num_workers=8),
            Normalize(),
        ],
    },
    exclude_fields=['path'],
)

print("\nSlipstreamLoader (val mode, CenterCrop):")
result_val = benchmark_loader_epochs(val_loader, NUM_EPOCHS, NUM_WARMUP)
print(f"  Average: {result_val['samples_per_sec']:,.0f} samples/sec")

## 5. Summary

Results from this benchmark run (full epoch iterations):

In [None]:
import pandas as pd

# Compile results
results = [
    {'Stage': 'StreamingDataLoader (raw)', 'Samples/sec': result_streaming['samples_per_sec']},
    {'Stage': 'SlipstreamLoader (raw)', 'Samples/sec': result_slipstream_raw['samples_per_sec']},
    {'Stage': 'OptimizedCache direct', 'Samples/sec': result_io['samples_per_sec']},
    {'Stage': 'CPU Decode (no crop)', 'Samples/sec': result_decode['samples_per_sec']},
    {'Stage': 'CPU Decode + RRC', 'Samples/sec': result_rrc['samples_per_sec']},
    {'Stage': 'Full Pipeline (train)', 'Samples/sec': result_loader['samples_per_sec']},
    {'Stage': 'Full Pipeline (val)', 'Samples/sec': result_val['samples_per_sec']},
]

df = pd.DataFrame(results)

# Calculate epoch time for 50k samples (ImageNet val)
num_samples = result_io['total_samples']
df['Epoch Time (s)'] = num_samples / df['Samples/sec']
df['Samples/sec'] = df['Samples/sec'].apply(lambda x: f"{x:,.0f}")
df['Epoch Time (s)'] = df['Epoch Time (s)'].apply(lambda x: f"{x:.2f}")

print(f"Results (averaged over {NUM_EPOCHS} epochs, {num_samples:,} samples each):\n")
print(df.to_string(index=False))

# Highlight the key comparison
print(f"\n--- Key Finding ---")
print(f"SlipstreamLoader is {speedup:.0f}x faster than StreamingDataLoader for raw I/O")
print(f"This speedup comes from memory-mapped access with OS page cache after warmup.")

In [None]:
# Cleanup
loader.shutdown()
val_loader.shutdown()
raw_loader.shutdown()
cpu_decoder.shutdown()
print("Done!")

## Notes

**New Pipeline-Based API:**
```python
from slipstream import SlipstreamLoader, RandomResizedCrop, CenterCrop, Normalize

# Training with pipelines
loader = SlipstreamLoader(
    dataset,
    batch_size=256,
    pipelines={
        'image': [
            RandomResizedCrop(224, device='cuda'),
            Normalize(),
        ],
    },
)

# Raw I/O (no pipelines, for benchmarking)
raw_loader = SlipstreamLoader(dataset, batch_size=256)
```

**Performance Tips:**
- First epoch is slower due to cold cache (chunks downloaded from S3)
- Subsequent epochs benefit from OS page cache (near-instantaneous I/O)
- Use `device='cuda'` in pipelines for GPU acceleration
- Set `num_workers` in RandomResizedCrop/CenterCrop for CPU parallelism

**Expected Bottlenecks:**
- macOS: CPU decode is the bottleneck (~5-7k samples/sec)
- Linux + GPU: Decode matches I/O (~10k samples/sec with nvImageCodec)

**Comparison to FFCV:**
- Raw I/O: SlipstreamLoader is 37% faster (480k vs 350k samples/sec)
- GPU Decode + RRC: Equivalent (~10k samples/sec)
- Cold start: 20% faster due to parallel chunk downloads

**Key Benefits:**
- Composable pipelines: Mix and match decode, crop, normalize
- No pipelines = raw bytes: Perfect for benchmarking I/O
- Same field names as the original dataset
- All fields accessible: image, label, index, path