# Phase 3: Dataset & DataLoader Testing

Tests the complete data pipeline with functional scope:
- Data preprocessing (windowing, padding, normalization)
- PyTorch Dataset implementation
- DataModule and DataLoader functionality
- Integration with model requirements

**Environment:** Google Colab with A100 GPU
**Expected execution time:** ~5-10 minutes

In [1]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Setup paths and imports
import sys
from pathlib import Path

BASE_DIR = Path("/content/drive/MyDrive/Colab Notebooks/Transformers/FP")
DATA_DIR = BASE_DIR / "data" / "processed"
SRC_DIR = BASE_DIR / "src"

sys.path.insert(0, str(SRC_DIR))

print(f"Base: {BASE_DIR}")
print(f"Data: {DATA_DIR}")
print(f"Src: {SRC_DIR}")

Base: /content/drive/MyDrive/Colab Notebooks/Transformers/FP
Data: /content/drive/MyDrive/Colab Notebooks/Transformers/FP/data/processed
Src: /content/drive/MyDrive/Colab Notebooks/Transformers/FP/src


In [3]:
# Core imports
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader
from typing import Dict, Tuple

from data.preprocessing import DataPreprocessor
from data.dataset import NQFuturesDataset, NQDataModule, collate_fn

print(f"PyTorch: {torch.__version__}")
print(f"CUDA: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

PyTorch: 2.9.0+cu126
CUDA: True
GPU: NVIDIA A100-SXM4-80GB


## 1. Load and Validate Data

In [4]:
def load_and_validate_data(data_path: Path) -> Tuple[pd.DataFrame, list, list]:
    """
    Load feature data and identify columns.

    Returns:
        df, feature_cols, target_cols
    """
    df = pd.read_parquet(data_path)

    target_cols = ['target_15m', 'target_30m', 'target_60m', 'target_2h', 'target_4h']
    feature_cols = [col for col in df.columns if col not in target_cols]

    print(f"Data shape: {df.shape}")
    print(f"Date range: {df.index.min().date()} to {df.index.max().date()}")
    print(f"Features: {len(feature_cols)}")
    print(f"Targets: {len(target_cols)}")

    assert len(feature_cols) == 24, f"Expected 24 features, got {len(feature_cols)}"
    assert len(target_cols) == 5, f"Expected 5 targets, got {len(target_cols)}"
    print("\n[PASS] Data loaded and validated")

    return df, feature_cols, target_cols

# Execute
data_path = DATA_DIR / "nq_features_full.parquet"
df, feature_cols, target_cols = load_and_validate_data(data_path)

Data shape: (1086808, 29)
Date range: 2010-06-07 to 2025-12-03
Features: 24
Targets: 5

[PASS] Data loaded and validated


## 2. Test DataPreprocessor

In [5]:
def test_window_creation(
    df: pd.DataFrame,
    feature_cols: list,
    target_cols: list
) -> DataPreprocessor:
    """
    Test window creation with 24h lookback.

    Returns:
        Initialized preprocessor
    """
    preprocessor = DataPreprocessor(
        max_seq_len=288,
        feature_cols=feature_cols,
        target_cols=target_cols
    )

    # Test with timestamp well into dataset
    test_timestamp = df.index[df.index >= df.index.min() + pd.Timedelta(hours=48)][100]

    features, attention_mask, metadata = preprocessor.create_window(df, test_timestamp)

    print(f"Window created for {test_timestamp}")
    print(f"  Features: {features.shape}")
    print(f"  Attention mask: {attention_mask.shape}")
    print(f"  Actual length: {metadata['actual_length']}")
    print(f"  Pad length: {metadata['pad_length']}")

    # Validate
    assert features.shape == (288, 24), f"Expected (288, 24), got {features.shape}"
    assert attention_mask.shape == (288,), f"Expected (288,), got {attention_mask.shape}"
    assert metadata['actual_length'] == attention_mask.sum()
    assert 273 <= metadata['actual_length'] <= 288

    print("\n[PASS] Window creation validated")
    return preprocessor

# Execute
preprocessor = test_window_creation(df, feature_cols, target_cols)

Window created for 2010-06-09 14:20:00+00:00
  Features: (288, 24)
  Attention mask: (288,)
  Actual length: 279
  Pad length: 9

[PASS] Window creation validated


In [6]:
def test_normalization(
    df: pd.DataFrame,
    preprocessor: DataPreprocessor
) -> None:
    """
    Test RevIN normalization with low-variance handling.
    """
    # Get test window
    test_timestamp = df.index[1000]
    features, attention_mask, _ = preprocessor.create_window(df, test_timestamp)

    # Normalize
    normalized, norm_stats = preprocessor.normalize_window(
        features, attention_mask, store_stats=True
    )

    # Extract valid positions only
    valid_normalized = normalized[attention_mask]
    padding_features = normalized[~attention_mask]

    # Compute statistics
    normalized_means = valid_normalized.mean(axis=0)
    normalized_stds = valid_normalized.std(axis=0)

    print("Normalization statistics:")
    print(f"  Mean range: [{normalized_means.min():.2e}, {normalized_means.max():.2e}]")
    print(f"  Std range: [{normalized_stds.min():.4f}, {normalized_stds.max():.4f}]")

    # Validate padding remains zero
    assert np.allclose(padding_features, 0), "Padding should remain zeros"

    # Validate centering (all features)
    assert np.allclose(normalized_means, 0, atol=1e-3), \
        f"Means not near zero: range [{normalized_means.min():.2e}, {normalized_means.max():.2e}]"

    # Separate by variance
    eps = norm_stats['eps']
    original_stds = norm_stats['std']
    normal_mask = original_stds > eps
    low_var_mask = original_stds <= eps

    print(f"\nFeature variance distribution:")
    print(f"  Normal variance (std > {eps}): {normal_mask.sum()} features")
    print(f"  Low variance (std <= {eps}): {low_var_mask.sum()} features")

    # Validate normal features
    if normal_mask.sum() > 0:
        normal_stds = normalized_stds[normal_mask]
        assert np.allclose(normal_stds, 1, atol=0.1), \
            f"Normal features std not ~1: range [{normal_stds.min():.3f}, {normal_stds.max():.3f}]"
        print(f"[PASS] Normal features normalized to std ~ 1")
        print(f"  Range: [{normal_stds.min():.4f}, {normal_stds.max():.4f}]")

    # Validate low-variance features
    if low_var_mask.sum() > 0:
        low_var_stds = normalized_stds[low_var_mask]
        assert np.all(low_var_stds < 0.1), \
            f"Low-var features amplified: max std {low_var_stds.max():.2e}"
        print(f"[PASS] Low-variance features remain small")
        print(f"  Max std: {low_var_stds.max():.2e}")

    print("\n[PASS] Normalization validated")

# Execute
test_normalization(df, preprocessor)

Normalization statistics:
  Mean range: [-5.05e-07, 7.12e-05]
  Std range: [0.0000, 1.0000]

Feature variance distribution:
  Normal variance (std > 1e-05): 22 features
  Low variance (std <= 1e-05): 2 features
[PASS] Normal features normalized to std ~ 1
  Range: [1.0000, 1.0000]
[PASS] Low-variance features remain small
  Max std: 1.25e-06

[PASS] Normalization validated


In [7]:
def test_temporal_info(
    df: pd.DataFrame,
    preprocessor: DataPreprocessor
) -> None:
    """
    Test temporal information extraction.
    """
    test_timestamp = df.index[1000]
    _, _, metadata = preprocessor.create_window(df, test_timestamp)

    temporal_info = preprocessor.extract_temporal_info(
        metadata['timestamps'], test_timestamp
    )

    print("Temporal information:")
    print(f"  bar_in_day shape: {temporal_info['bar_in_day'].shape}")
    print(f"  bar_in_day range: [{temporal_info['bar_in_day'].min()}, {temporal_info['bar_in_day'].max()}]")
    print(f"  day_of_week: {temporal_info['day_of_week']}")
    print(f"  day_of_month: {temporal_info['day_of_month']}")
    print(f"  day_of_year: {temporal_info['day_of_year']}")

    # Validate ranges
    assert 0 <= temporal_info['bar_in_day'].min() <= temporal_info['bar_in_day'].max() < 288
    assert 0 <= temporal_info['day_of_week'] <= 4
    assert 1 <= temporal_info['day_of_month'] <= 31
    assert 1 <= temporal_info['day_of_year'] <= 366

    print("\n[PASS] Temporal info validated")

# Execute
test_temporal_info(df, preprocessor)

Temporal information:
  bar_in_day shape: (278,)
  bar_in_day range: [0, 287]
  day_of_week: 3
  day_of_month: 10
  day_of_year: 161

[PASS] Temporal info validated


In [8]:
def test_splits_and_leakage(
    df: pd.DataFrame,
    preprocessor: DataPreprocessor
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Test train/val/test splits with leakage validation.

    Returns:
        train_df, val_df, test_df
    """
    # Create splits with purge gaps
    train_df, val_df, test_df = preprocessor.create_splits(
        df, train_end='2021-12-31', val_end='2023-12-31'
    )

    # Validate no leakage
    leakage_ok = preprocessor.validate_no_leakage(
        train_df, val_df, test_df, tolerance_hours=24
    )

    print(f"\n[PASS] Splits created with proper temporal separation")
    return train_df, val_df, test_df

# Execute
train_df, val_df, test_df = test_splits_and_leakage(df, preprocessor)

Split statistics:
  Train: 808,996 samples (2010-06-07 to 2021-12-31)
  Val:   141,516 samples (2022-01-02 to 2023-12-29)
  Test:  136,284 samples (2024-01-02 to 2025-12-03)

Temporal gaps:
  Train-Val gap: 49.1 hours
  Val-Test gap: 74.1 hours
  Purged samples: ~576 total (~288 per gap)
[PASS] No data leakage detected:
  Train-Val gap: 49.1 hours
  Val-Test gap: 74.1 hours

[PASS] Splits created with proper temporal separation


## 3. Test PyTorch Dataset

In [9]:
def test_pytorch_dataset(
    train_df: pd.DataFrame,
    feature_cols: list,
    target_cols: list,
    preprocessor: DataPreprocessor
) -> NQFuturesDataset:
    """
    Test NQFuturesDataset implementation.

    Returns:
        Dataset instance
    """
    dataset = NQFuturesDataset(
        train_df, feature_cols, target_cols, preprocessor
    )

    print(f"Dataset length: {len(dataset):,}")

    # Test sample access
    sample = dataset[0]

    print("\nSample structure:")
    print(f"  features: {sample['features'].shape}")
    print(f"  attention_mask: {sample['attention_mask'].shape}")
    print(f"  targets: {sample['targets'].shape}")
    print(f"  bar_in_day: {sample['bar_in_day'].shape}")
    print(f"  day_of_week: {sample['day_of_week']}")

    # Validate shapes
    assert sample['features'].shape == (288, 24)
    assert sample['attention_mask'].shape == (288,)
    assert sample['targets'].shape == (5,)
    assert len(sample['bar_in_day'].shape) == 1

    print("\n[PASS] Dataset implementation validated")
    return dataset

# Execute
train_dataset = test_pytorch_dataset(train_df, feature_cols, target_cols, preprocessor)

Dataset length: 808,708

Sample structure:
  features: torch.Size([288, 24])
  attention_mask: torch.Size([288])
  targets: torch.Size([5])
  bar_in_day: torch.Size([279])
  day_of_week: 1

[PASS] Dataset implementation validated


## 4. Test DataModule & DataLoaders

In [10]:
def test_data_module(
    data_path: Path
) -> NQDataModule:
    """
    Test NQDataModule with proper setup.

    Returns:
        Initialized data module
    """
    data_module = NQDataModule(
        data_path=data_path,
        batch_size=32,
        num_workers=2,
        pin_memory=True,
        max_seq_len=288,
        train_end='2021-12-31',
        val_end='2023-12-31'
    )

    # Setup (loads data, creates splits)
    data_module.setup()

    print(f"\nDataset sizes:")
    print(f"  Train: {len(data_module.train_dataset):,}")
    print(f"  Val: {len(data_module.val_dataset):,}")
    print(f"  Test: {len(data_module.test_dataset):,}")

    print("\n[PASS] DataModule setup complete")
    return data_module

# Execute
data_module = test_data_module(data_path)

Loading data from /content/drive/MyDrive/Colab Notebooks/Transformers/FP/data/processed/nq_features_full.parquet
Features: 24
Targets: 5
Split statistics:
  Train: 808,996 samples (2010-06-07 to 2021-12-31)
  Val:   141,516 samples (2022-01-02 to 2023-12-29)
  Test:  136,284 samples (2024-01-02 to 2025-12-03)

Temporal gaps:
  Train-Val gap: 49.1 hours
  Val-Test gap: 74.1 hours
  Purged samples: ~576 total (~288 per gap)
[PASS] No data leakage detected:
  Train-Val gap: 49.1 hours
  Val-Test gap: 74.1 hours

Dataset sizes:
  Train: 808,708
  Val:   141,228
  Test:  135,996

Dataset sizes:
  Train: 808,708
  Val: 141,228
  Test: 135,996

[PASS] DataModule setup complete


In [11]:
def test_dataloaders(
    data_module: NQDataModule
) -> Dict:
    """
    Test DataLoader functionality and batch loading.

    Returns:
        Sample batch from train loader
    """
    train_loader = data_module.train_dataloader()
    val_loader = data_module.val_dataloader()

    print(f"Train loader: {len(train_loader)} batches")
    print(f"Val loader: {len(val_loader)} batches")

    # Test batch loading
    batch = next(iter(train_loader))

    print("\nBatch structure:")
    print(f"  features: {batch['features'].shape}")
    print(f"  attention_mask: {batch['attention_mask'].shape}")
    print(f"  targets: {batch['targets'].shape}")
    print(f"  bar_in_day: {batch['bar_in_day'].shape}")

    # Validate batch shapes
    B = batch['features'].shape[0]
    assert batch['features'].shape == (B, 288, 24)
    assert batch['attention_mask'].shape == (B, 288)
    assert batch['targets'].shape == (B, 5)

    print("\n[PASS] DataLoaders validated")
    return batch

# Execute
sample_batch = test_dataloaders(data_module)

Train loader: 25272 batches
Val loader: 4414 batches

Batch structure:
  features: torch.Size([32, 288, 24])
  attention_mask: torch.Size([32, 288])
  targets: torch.Size([32, 5])
  bar_in_day: torch.Size([32, 288])

[PASS] DataLoaders validated


## 5. Integration Tests

In [12]:
def test_gpu_transfer(batch: Dict) -> None:
    """
    Test GPU transfer if available.
    """
    if not torch.cuda.is_available():
        print("No GPU available, skipping GPU tests")
        return

    device = torch.device('cuda')

    # Transfer batch
    features_gpu = batch['features'].to(device)
    mask_gpu = batch['attention_mask'].to(device)
    targets_gpu = batch['targets'].to(device)

    print(f"GPU transfer successful:")
    print(f"  Features device: {features_gpu.device}")
    print(f"  Mask device: {mask_gpu.device}")
    print(f"  Targets device: {targets_gpu.device}")

    print("\n[PASS] GPU transfer validated")

# Execute
test_gpu_transfer(sample_batch)

GPU transfer successful:
  Features device: cuda:0
  Mask device: cuda:0
  Targets device: cuda:0

[PASS] GPU transfer validated


In [13]:
def test_memory_footprint(data_path: Path) -> None:
    """
    Test memory usage with production batch size.
    """
    import gc

    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    gc.collect()

    # Create module with production batch size
    data_module_large = NQDataModule(
        data_path=data_path,
        batch_size=128,
        num_workers=0,
        pin_memory=False
    )
    data_module_large.setup()

    loader = data_module_large.train_dataloader()
    batch_large = next(iter(loader))

    memory_mb = batch_large['features'].element_size() * batch_large['features'].nelement() / 1024**2

    print(f"Large batch test (batch_size=128):")
    print(f"  Features: {batch_large['features'].shape}")
    print(f"  Memory: {memory_mb:.2f} MB")

    if torch.cuda.is_available():
        device = torch.device('cuda')
        batch_large_gpu = {k: v.to(device) if isinstance(v, torch.Tensor) else v
                          for k, v in batch_large.items()}
        print(f"  GPU allocated: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
        print(f"  GPU reserved: {torch.cuda.memory_reserved() / 1024**2:.2f} MB")

    print("\n[PASS] Memory footprint acceptable for A100")

# Execute
test_memory_footprint(data_path)

Loading data from /content/drive/MyDrive/Colab Notebooks/Transformers/FP/data/processed/nq_features_full.parquet
Features: 24
Targets: 5
Split statistics:
  Train: 808,996 samples (2010-06-07 to 2021-12-31)
  Val:   141,516 samples (2022-01-02 to 2023-12-29)
  Test:  136,284 samples (2024-01-02 to 2025-12-03)

Temporal gaps:
  Train-Val gap: 49.1 hours
  Val-Test gap: 74.1 hours
  Purged samples: ~576 total (~288 per gap)
[PASS] No data leakage detected:
  Train-Val gap: 49.1 hours
  Val-Test gap: 74.1 hours

Dataset sizes:
  Train: 808,708
  Val:   141,228
  Test:  135,996
Large batch test (batch_size=128):
  Features: torch.Size([128, 288, 24])
  Memory: 3.38 MB
  GPU allocated: 3.55 MB
  GPU reserved: 22.00 MB

[PASS] Memory footprint acceptable for A100


## 6. Data Quality Checks

In [14]:
def check_target_statistics(
    data_module: NQDataModule,
    target_cols: list
) -> None:
    """
    Check target statistics across splits.
    """
    def get_stats(dataset, name, n_samples=1000):
        targets_list = []
        for i in range(min(n_samples, len(dataset))):
            targets_list.append(dataset[i]['targets'].numpy())

        targets = np.array(targets_list)

        print(f"\n{name} target statistics:")
        for i, col in enumerate(target_cols):
            print(f"  {col}:")
            print(f"    Mean: {targets[:, i].mean():.6f}")
            print(f"    Std: {targets[:, i].std():.6f}")

    get_stats(data_module.train_dataset, "Train")
    get_stats(data_module.val_dataset, "Validation")
    get_stats(data_module.test_dataset, "Test")

    print("\n[PASS] Target statistics computed")

# Execute
check_target_statistics(data_module, target_cols)


Train target statistics:
  target_15m:
    Mean: 0.000063
    Std: 0.001949
  target_30m:
    Mean: 0.000128
    Std: 0.002769
  target_60m:
    Mean: 0.000264
    Std: 0.004069
  target_2h:
    Mean: 0.000552
    Std: 0.005530
  target_4h:
    Mean: 0.001264
    Std: 0.006850

Validation target statistics:
  target_15m:
    Mean: -0.000140
    Std: 0.001496
  target_30m:
    Mean: -0.000272
    Std: 0.002136
  target_60m:
    Mean: -0.000551
    Std: 0.002969
  target_2h:
    Mean: -0.001227
    Std: 0.004456
  target_4h:
    Mean: -0.002528
    Std: 0.006672

Test target statistics:
  target_15m:
    Mean: -0.000013
    Std: 0.000881
  target_30m:
    Mean: -0.000023
    Std: 0.001281
  target_60m:
    Mean: -0.000043
    Std: 0.001887
  target_2h:
    Mean: -0.000069
    Std: 0.002742
  target_4h:
    Mean: -0.000046
    Std: 0.004001

[PASS] Target statistics computed


In [15]:
def check_normalization_consistency(
    data_module: NQDataModule
) -> None:
    """
    Check instance normalization consistency.
    """
    train_loader = data_module.train_dataloader()

    batch1 = next(iter(train_loader))
    batch2 = next(iter(train_loader))

    # Extract valid positions
    valid1 = batch1['features'][batch1['attention_mask']]
    valid2 = batch2['features'][batch2['attention_mask']]

    mean1 = valid1.mean(dim=0).numpy()
    mean2 = valid2.mean(dim=0).numpy()

    print("Feature means for two different batches:")
    print(f"  Batch 1: {mean1[:5]}...")
    print(f"  Batch 2: {mean2[:5]}...")
    print(f"\nBoth should be close to zero (instance normalization per sample)")

    assert np.allclose(mean1, 0, atol=0.1), "Batch 1 not normalized"
    assert np.allclose(mean2, 0, atol=0.1), "Batch 2 not normalized"

    print("\n[PASS] Instance normalization working correctly")

# Execute
check_normalization_consistency(data_module)

Feature means for two different batches:
  Batch 1: [2.8082022e-05 7.2043722e-06 4.0269093e-05 3.7512207e-05 1.0184747e-09]...
  Batch 2: [-5.3882718e-06 -2.9709909e-05 -1.2825661e-05 -3.7584370e-05
 -1.0823839e-08]...

Both should be close to zero (instance normalization per sample)

[PASS] Instance normalization working correctly


## 7. Save Splits (Optional)

In [16]:
def save_splits(
    data_module: NQDataModule,
    output_dir: Path
) -> None:
    """
    Save train/val/test splits to parquet.
    """
    data_module.save_splits(output_dir)

    print(f"\nSplit files saved to: {output_dir}")
    print("  - train_samples.parquet")
    print("  - val_samples.parquet")
    print("  - test_samples.parquet")

# Execute (optional)
save_splits(data_module, DATA_DIR)

Splits saved to /content/drive/MyDrive/Colab Notebooks/Transformers/FP/data/processed

Split files saved to: /content/drive/MyDrive/Colab Notebooks/Transformers/FP/data/processed
  - train_samples.parquet
  - val_samples.parquet
  - test_samples.parquet


## 8. Summary & Validation Report

In [17]:
def print_test_summary() -> None:
    """
    Print comprehensive test summary.
    """
    print("="*80)
    print("PHASE 3 TESTING SUMMARY")
    print("="*80)

    print("\n[PASS] DataPreprocessor:")
    print("  - Window creation with 24h lookback")
    print("  - Padding to 288 max length")
    print("  - Attention mask generation")
    print("  - RevIN instance normalization")
    print("  - Temporal info extraction")
    print("  - Train/val/test splits with purge gaps")
    print("  - Data leakage validation")

    print("\n[PASS] NQFuturesDataset:")
    print("  - PyTorch Dataset implementation")
    print("  - Sample access (__getitem__)")
    print("  - Tensor conversion")
    print("  - Shape validation")

    print("\n[PASS] NQDataModule:")
    print("  - Data loading from parquet")
    print("  - Split creation")
    print("  - DataLoader initialization")
    print("  - Batch loading")
    print("  - GPU transfer")

    print("\n[PASS] Integration:")
    print("  - Full pipeline functionality")
    print("  - Memory footprint acceptable")
    print("  - GPU compatibility")
    print("  - Data quality validated")

    print("\n" + "="*80)
    print("ALL TESTS PASSED - PHASE 3 COMPLETE")
    print("="*80)

    print("\nNext steps:")
    print("  Phase 4: Model Architecture Implementation")
    print("  - Variable embedding module")
    print("  - Positional encoding modules")
    print("  - Temporal & variable attention")
    print("  - Gated instance normalization")
    print("  - Quantile output heads")

# Execute
print_test_summary()

PHASE 3 TESTING SUMMARY

[PASS] DataPreprocessor:
  - Window creation with 24h lookback
  - Padding to 288 max length
  - Attention mask generation
  - RevIN instance normalization
  - Temporal info extraction
  - Train/val/test splits with purge gaps
  - Data leakage validation

[PASS] NQFuturesDataset:
  - PyTorch Dataset implementation
  - Sample access (__getitem__)
  - Tensor conversion
  - Shape validation

[PASS] NQDataModule:
  - Data loading from parquet
  - Split creation
  - DataLoader initialization
  - Batch loading
  - GPU transfer

[PASS] Integration:
  - Full pipeline functionality
  - Memory footprint acceptable
  - GPU compatibility
  - Data quality validated

ALL TESTS PASSED - PHASE 3 COMPLETE

Next steps:
  Phase 4: Model Architecture Implementation
  - Variable embedding module
  - Positional encoding modules
  - Temporal & variable attention
  - Gated instance normalization
  - Quantile output heads
