# Data Pipeline (Phases 1-5)

**Purpose:** Run the complete data pipeline from raw OHLCV to model-ready datasets.

**Phases:**
1. Ingestion: Load raw 1-min OHLCV
2. MTF Upscaling: Resample to 8 intraday timeframes
3. Features: 180+ indicators (momentum, wavelets, microstructure)
4. Labeling: Triple-barrier with Optuna optimization
5. Adapters: Model-family data preparation (2D, 3D)

**Outputs:** Processed datasets saved to Google Drive

**Expected Runtime:** 30-60 minutes (depending on symbol and data size)

## Setup Environment

In [None]:
# Setup Colab environment
import sys
sys.path.insert(0, '/content/research')

from notebooks.colab_setup import setup_colab_environment, is_colab

env_info = setup_colab_environment(
    mount_drive=True,
    use_gpu=True,
)

print(f"\nüìä Environment Info:")
print(f"  Running in Colab: {env_info.get('is_colab', False)}")
print(f"  GPU available: {env_info.get('gpu_available', False)}")
print(f"  Drive mounted: {env_info.get('drive_mounted', False)}")

# Check disk space
import shutil
total, used, free = shutil.disk_usage("/content")
print(f"\nüíæ Disk Space:")
print(f"  Total: {total // (1024**3)} GB")
print(f"  Used: {used // (1024**3)} GB")
print(f"  Free: {free // (1024**3)} GB")

## Initialize Checkpoint Manager

In [None]:
# Simple checkpoint tracking (Pipeline has built-in checkpointing)
from pathlib import Path

# Define checkpoint directory on Drive for persistence
CHECKPOINT_DIR = Path("/content/drive/MyDrive/ml_factory/checkpoints")
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)

# Simple metrics logger
class SimpleLogger:
    def log_metrics(self, metrics):
        print(f"üìä Metrics: {metrics}")
    
    def finish_wandb_run(self):
        pass

ckpt_mgr = SimpleLogger()
print(f"‚úÖ Checkpoint directory: {CHECKPOINT_DIR}")

## Load Raw Data from Drive

In [None]:
import pandas as pd
from pathlib import Path

# Define paths
SYMBOL = "MES"
DRIVE_DATA_PATH = Path("/content/drive/MyDrive/ml_factory/data/raw")
LOCAL_DATA_PATH = Path("/content/data/raw")
LOCAL_DATA_PATH.mkdir(parents=True, exist_ok=True)

# Copy data from Drive to local disk (faster I/O)
raw_data_file = DRIVE_DATA_PATH / f"{SYMBOL}_1m.parquet"
local_data_file = LOCAL_DATA_PATH / f"{SYMBOL}_1m.parquet"

if not local_data_file.exists():
    print(f"Copying data from Drive to local disk...")
    import shutil
    shutil.copy(raw_data_file, local_data_file)
    print(f"‚úÖ Data copied to {local_data_file}")
else:
    print(f"‚úÖ Data already exists at {local_data_file}")

# Load data
df_raw = pd.read_parquet(local_data_file)
print(f"\nRaw data shape: {df_raw.shape}")
print(df_raw.head())

## Check for Existing Checkpoint (Resume if Available)

In [None]:
# Note: PipelineRunner has built-in checkpointing
# It automatically saves state after each stage to config.checkpoint_dir
# When you create PipelineRunner with resume=True, it loads the last checkpoint

# For manual checkpoint inspection:
from pathlib import Path
import json

checkpoint_file = Path("/content/output/pipeline_state.json")
if checkpoint_file.exists():
    with open(checkpoint_file) as f:
        checkpoint = json.load(f)
    print(f"\n‚úÖ Found checkpoint from: {checkpoint.get('timestamp', 'unknown')}")
    print(f"Completed stages: {checkpoint.get('completed_stages', [])}")
    last_completed_phase = len(checkpoint.get('completed_stages', []))
else:
    print("\nüÜï No checkpoint found - starting from scratch")
    last_completed_phase = 0

## Run Pipeline (with Checkpointing)

In [None]:
# Import pipeline runner
from src.pipeline.runner import PipelineRunner
from src.pipeline.config import PipelineConfig

# Configure pipeline
config = PipelineConfig(
    symbols=[SYMBOL],
    data_dir=Path("/content/data"),
    output_dir=Path("/content/output"),
)

# Initialize pipeline runner (resume=True to continue from last checkpoint)
runner = PipelineRunner(config, resume=True)

# Run pipeline
# Note: Pipeline has built-in checkpointing via _save_state()/_load_state()
# It automatically saves after each stage and can resume from last completed stage
try:
    # from_stage parameter accepts stage name (string), not phase number
    # If resuming, leave from_stage=None to auto-resume from last completed stage
    from_stage = None  # Auto-resume from checkpoint
    
    success = runner.run(from_stage=from_stage)
    
    if success:
        print("\n‚úÖ Pipeline completed successfully!")
        print(f"Completed stages: {runner.get_completed_stages()}")
    else:
        print("\n‚ö†Ô∏è Pipeline completed with some issues")
    
except Exception as e:
    print(f"\n‚ùå Pipeline failed: {e}")
    print(f"Completed stages before failure: {runner.get_completed_stages()}")
    raise

## Copy Results to Google Drive (Permanent Storage)

In [None]:
import shutil

# Define paths
LOCAL_OUTPUT = Path("/content/data/splits/scaled")
DRIVE_OUTPUT = Path("/content/drive/MyDrive/ml_factory/data/processed")
DRIVE_OUTPUT.mkdir(parents=True, exist_ok=True)

# Copy processed datasets to Drive
print("Copying processed datasets to Google Drive...")
shutil.copytree(LOCAL_OUTPUT, DRIVE_OUTPUT / SYMBOL, dirs_exist_ok=True)

print(f"\n‚úÖ Processed datasets saved to: {DRIVE_OUTPUT / SYMBOL}")

## Finish W&B Run

In [None]:
# Log final metrics
ckpt_mgr.log_metrics({
    "pipeline_status": "completed",
    "num_samples": len(df_raw),
    "symbol": SYMBOL,
})

print("\n‚úÖ Data pipeline complete!")
print("üìÅ Processed datasets saved to Google Drive")
print("\nüöÄ Next: Run 02_train_tabular.ipynb to train models")