# AlgoSpace Advanced Data Preparation Pipeline - Production Ready

This notebook implements a comprehensive and robust feature engineering pipeline for the AlgoSpace MARL trading system.

## Key Features:
- ✅ ES futures data processing with Heiken Ashi transformation
- ✅ Advanced LVN (Low Volume Node) strength scoring
- ✅ MMD (Maximum Mean Discrepancy) feature vector calculation for the Regime Detection Engine
- ✅ Interaction feature engineering
- ✅ Chunked processing for large files
- ✅ Automatic recovery from failures
- ✅ Progress bars and data validation

## Output Files:
1. **main_training_data.parquet** - For MARL training
2. **rde_training_data.h5** - MMD sequences for RDE
3. **mrms_training_data.parquet** - Risk scenarios for M-RMS

## 1. Environment Setup with Error Handling

In [None]:
# Check if running in Colab
try:
    import google.colab
    IN_COLAB = True
    print("✅ Running in Google Colab")
except ImportError:
    IN_COLAB = False
    print("⚠️ Not running in Google Colab")

# GPU check
import torch
if torch.cuda.is_available():
    print(f"✅ GPU Available: {torch.cuda.get_device_name(0)}")
    print(f"💾 GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
else:
    print("❌ No GPU available. Processing will be slower.")

In [None]:
# Install required packages with error handling
import subprocess
import sys

def install_package(package):
    """Install a package with error handling."""
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
        return True
    except Exception as e:
        print(f"❌ Failed to install {package}: {e}")
        return False

# Core packages
core_packages = [
    "yfinance", "pandas>=1.5.0", "numpy", "h5py", "pyarrow",
    "ta", "pandas-ta", "scikit-learn", "tqdm", "scipy",
    "matplotlib", "seaborn", "pyyaml"
]

print("📦 Installing packages...")
for package in core_packages:
    if install_package(package):
        print(f"   ✅ {package.split('>=')[0]}")

print("\n✅ Dependencies installed")

In [None]:
# Mount Google Drive with proper error handling
if IN_COLAB:
    try:
        from google.colab import drive
        drive.mount('/content/drive', force_remount=True)
        DRIVE_BASE = "/content/drive/MyDrive/AlgoSpace-8"
        
        # Create directory structure
        import os
        dirs_to_create = [
            f"{DRIVE_BASE}/data/raw",
            f"{DRIVE_BASE}/data/processed",
            f"{DRIVE_BASE}/data/checkpoints",
            f"{DRIVE_BASE}/logs"
        ]
        
        for dir_path in dirs_to_create:
            os.makedirs(dir_path, exist_ok=True)
            
        print(f"✅ Google Drive mounted at {DRIVE_BASE}")
        
    except Exception as e:
        print(f"❌ Failed to mount Google Drive: {e}")
        DRIVE_BASE = "/content/local_data"
        os.makedirs(DRIVE_BASE, exist_ok=True)
        print(f"⚠️ Using local directory: {DRIVE_BASE}")
else:
    DRIVE_BASE = "./drive_simulation"
    os.makedirs(f"{DRIVE_BASE}/data/raw", exist_ok=True)
    os.makedirs(f"{DRIVE_BASE}/data/processed", exist_ok=True)
    print(f"📁 Using local simulation directory: {DRIVE_BASE}")

In [None]:
# Import libraries with proper error handling
import numpy as np
import pandas as pd
import yfinance as yf
import h5py
from datetime import datetime, timedelta
import ta
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.mixture import GaussianMixture
from tqdm.notebook import tqdm
import json
import yaml
import warnings
import hashlib
import gc
from scipy import stats
from collections import deque
import matplotlib.pyplot as plt
import seaborn as sns

warnings.filterwarnings('ignore')

# Set up paths
import sys
sys.path.append('/home/QuantNova/AlgoSpace-8')
sys.path.append('/home/QuantNova/AlgoSpace-8/notebooks')

print("✅ Libraries imported successfully")

## 2. Load Unified Configuration

In [None]:
# Load unified configuration
config_path = "/home/QuantNova/AlgoSpace-8/notebooks/config/unified_config.yaml"

try:
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)
    print("✅ Loaded unified configuration")
except FileNotFoundError:
    print("⚠️ Unified config not found, using default configuration")
    config = {
        'time_windows': {
            'window_30m': 48,
            'window_5m': 60,
            'window_1m': 120
        },
        'model': {
            'regime': {
                'latent_dim': 8,
                'mmd_dims': [64, 32, 16]
            }
        },
        'data': {
            'chunk_size': 10000,
            'normalize': True,
            'fill_method': 'forward'
        }
    }

# Add data-specific configuration
DATA_CONFIG = {
    # Symbol configuration
    'symbol': 'ES=F',  # E-mini S&P 500 futures
    'proxy_symbol': 'SPY',  # Use SPY if ES futures data not available
    
    # Date range (5 years for comprehensive training)
    'start_date': '2019-01-01',
    'end_date': '2023-12-31',
    
    # Data splits
    'train_ratio': 0.7,
    'val_ratio': 0.15,
    'test_ratio': 0.15,
    
    # Feature engineering parameters
    'volume_profile_bins': 50,
    'lvn_lookback_days': 20,
    'lvn_threshold_percentile': 30,
    
    # MMD parameters from config
    'mmd_window_size': 96,  # 96 * 30min = 48 hours
    'n_market_regimes': 7,
    'path_signature_depth': 3,
    
    # Processing parameters
    'chunk_size': config['data']['chunk_size'],
    'normalize': config['data']['normalize'],
    
    # Technical indicators
    'lookback_periods': [5, 10, 20, 50, 100, 200]
}

print("\n📋 Data Configuration:")
print(f"- Symbol: {DATA_CONFIG['symbol']} (or {DATA_CONFIG['proxy_symbol']} as proxy)")
print(f"- Date Range: {DATA_CONFIG['start_date']} to {DATA_CONFIG['end_date']}")
print(f"- Market Regimes: {DATA_CONFIG['n_market_regimes']}")
print(f"- Chunk Size: {DATA_CONFIG['chunk_size']:,} rows")

## 3. Data Download with Recovery

In [None]:
# Enhanced data download with checkpointing
def download_es_futures_data(config, checkpoint_path=None):
    """Download ES futures data with checkpoint support."""
    
    # Check for existing checkpoint
    if checkpoint_path and os.path.exists(checkpoint_path):
        print(f"📂 Loading data from checkpoint: {checkpoint_path}")
        try:
            data = pd.read_parquet(checkpoint_path)
            print(f"✅ Loaded {len(data)} rows from checkpoint")
            return data, data.attrs.get('symbol', config['proxy_symbol'])
        except Exception as e:
            print(f"⚠️ Failed to load checkpoint: {e}")
    
    print(f"📥 Downloading market data...")
    
    try:
        # Try ES futures first
        ticker = yf.Ticker(config['symbol'])
        data = ticker.history(start=config['start_date'], end=config['end_date'], interval='30m')
        
        if len(data) > 0:
            print(f"✅ Downloaded {config['symbol']} data: {len(data):,} rows")
            data_symbol = config['symbol']
        else:
            raise ValueError("No ES futures data available")
            
    except Exception as e:
        print(f"⚠️ ES futures not available: {e}")
        print(f"📥 Using {config['proxy_symbol']} as proxy...")
        
        # Use proxy data
        ticker = yf.Ticker(config['proxy_symbol'])
        data = ticker.history(start=config['start_date'], end=config['end_date'], interval='30m')
        data_symbol = config['proxy_symbol']
        
        if len(data) == 0:
            raise ValueError("No data available for proxy symbol either")
        
        print(f"✅ Downloaded {config['proxy_symbol']} proxy data: {len(data):,} rows")
    
    # Data cleaning
    data = data.dropna()
    data = data[data['Volume'] > 0]
    data = data.fillna(method='ffill')
    
    # Save checkpoint
    if checkpoint_path:
        data.attrs['symbol'] = data_symbol
        data.to_parquet(checkpoint_path)
        print(f"💾 Saved checkpoint to {checkpoint_path}")
    
    return data, data_symbol

# Download with checkpoint
checkpoint_path = f"{DRIVE_BASE}/data/checkpoints/raw_market_data.parquet"
es_data, used_symbol = download_es_futures_data(DATA_CONFIG, checkpoint_path)

print(f"\n📊 Data Overview:")
print(f"  - Symbol Used: {used_symbol}")
print(f"  - Shape: {es_data.shape}")
print(f"  - Date Range: {es_data.index[0]} to {es_data.index[-1]}")
print(f"  - Memory Usage: {es_data.memory_usage().sum() / 1e6:.2f} MB")

## 4. Heiken Ashi Calculation with Validation

In [None]:
# Enhanced Heiken Ashi calculation with validation
def calculate_heiken_ashi(df, validate=True):
    """Calculate Heiken Ashi candles with validation."""
    
    print("🕯️ Calculating Heiken Ashi candles...")
    
    ha_df = pd.DataFrame(index=df.index)
    
    # Calculate HA values
    ha_df['HA_Close'] = (df['Open'] + df['High'] + df['Low'] + df['Close']) / 4
    
    # Initialize HA_Open
    ha_df['HA_Open'] = 0.0
    ha_df.iloc[0, ha_df.columns.get_loc('HA_Open')] = (df['Open'].iloc[0] + df['Close'].iloc[0]) / 2
    
    # Vectorized calculation for better performance
    for i in tqdm(range(1, len(ha_df)), desc="Calculating HA Open"):
        ha_df.iloc[i, ha_df.columns.get_loc('HA_Open')] = (
            ha_df.iloc[i-1, ha_df.columns.get_loc('HA_Open')] + 
            ha_df.iloc[i-1, ha_df.columns.get_loc('HA_Close')]
        ) / 2
    
    # Calculate HA High and Low
    ha_df['HA_High'] = pd.concat([df['High'], ha_df['HA_Open'], ha_df['HA_Close']], axis=1).max(axis=1)
    ha_df['HA_Low'] = pd.concat([df['Low'], ha_df['HA_Open'], ha_df['HA_Close']], axis=1).min(axis=1)
    
    # Add volume and derived features
    ha_df['HA_Volume'] = df['Volume']
    ha_df['HA_Body'] = ha_df['HA_Close'] - ha_df['HA_Open']
    ha_df['HA_UpperShadow'] = ha_df['HA_High'] - pd.concat([ha_df['HA_Open'], ha_df['HA_Close']], axis=1).max(axis=1)
    ha_df['HA_LowerShadow'] = pd.concat([ha_df['HA_Open'], ha_df['HA_Close']], axis=1).min(axis=1) - ha_df['HA_Low']
    ha_df['HA_Direction'] = np.sign(ha_df['HA_Body'])
    
    # Validation
    if validate:
        invalid_rows = (
            (ha_df['HA_High'] < ha_df['HA_Low']) |
            (ha_df['HA_High'] < ha_df['HA_Open']) |
            (ha_df['HA_High'] < ha_df['HA_Close']) |
            (ha_df['HA_Low'] > ha_df['HA_Open']) |
            (ha_df['HA_Low'] > ha_df['HA_Close'])
        ).sum()
        
        if invalid_rows > 0:
            print(f"⚠️ Found {invalid_rows} invalid HA candles")
        else:
            print("✅ All HA candles valid")
    
    return ha_df

# Calculate Heiken Ashi
ha_data = calculate_heiken_ashi(es_data)

# Combine data
combined_data = pd.concat([es_data, ha_data], axis=1)
print(f"\n✅ Combined data shape: {combined_data.shape}")

## 5. Advanced LVN Analysis with Chunked Processing

In [None]:
# LVN calculation with chunked processing for memory efficiency
def calculate_lvn_strength(level, historical_window, future_window, lvn_volume, avg_volume):
    """Calculate LVN strength score (0-100)."""
    
    strength_components = []
    tolerance = 0.001  # 0.1% tolerance
    
    # 1. Test frequency score
    tests = 0
    bounces = []
    
    for idx, row in historical_window.iterrows():
        if abs(row['HA_Low'] - level) / level < tolerance or abs(row['HA_High'] - level) / level < tolerance:
            tests += 1
            next_bars = historical_window.loc[idx:].iloc[1:6]
            if len(next_bars) > 0:
                bounce_magnitude = abs(next_bars['HA_Close'].iloc[-1] - row['HA_Close']) / row['HA_Close']
                bounces.append(bounce_magnitude)
    
    test_score = min(tests * 10, 30)
    
    # 2. Bounce magnitude score
    bounce_score = min(np.mean(bounces) * 500, 25) if bounces else 0
    
    # 3. Recency score
    recency_weights = np.linspace(0.5, 1.0, len(historical_window))
    recent_tests = sum(recency_weights[i] for i, (_, row) in enumerate(historical_window.iterrows())
                      if abs(row['HA_Low'] - level) / level < tolerance or abs(row['HA_High'] - level) / level < tolerance)
    recency_score = min(recent_tests * 5, 20)
    
    # 4. Volume void score
    volume_void_score = (1 - lvn_volume / (avg_volume + 1e-10)) * 15
    
    # 5. Future validation score
    future_score = 0
    if len(future_window) > 0:
        for idx, row in future_window.iterrows():
            if abs(row['HA_Low'] - level) / level < tolerance:
                if row['HA_Close'] > level:
                    future_score += 2
        future_score = min(future_score, 10)
    
    # Total score
    total_score = test_score + bounce_score + recency_score + volume_void_score + future_score
    return min(total_score, 100)


def identify_lvns_chunked(data, config, chunk_size=5000):
    """Identify LVNs with chunked processing for memory efficiency."""
    
    print("🔍 Identifying LVNs with chunked processing...")
    
    lvn_results = []
    lookback = config['lvn_lookback_days'] * 48
    
    # Process in chunks
    total_rows = len(data) - lookback
    n_chunks = (total_rows + chunk_size - 1) // chunk_size
    
    for chunk_idx in tqdm(range(n_chunks), desc="Processing chunks"):
        start_idx = lookback + chunk_idx * chunk_size
        end_idx = min(lookback + (chunk_idx + 1) * chunk_size, len(data))
        
        chunk_lvn_data = []
        
        for i in range(start_idx, end_idx):
            window = data.iloc[i-lookback:i]
            
            # Create volume profile
            price_min = window['HA_Low'].min()
            price_max = window['HA_High'].max()
            bins = np.linspace(price_min, price_max, config['volume_profile_bins'] + 1)
            
            # Calculate volume at each price level
            volume_profile = np.zeros(config['volume_profile_bins'])
            
            for _, row in window.iterrows():
                bar_low_bin = np.searchsorted(bins, row['HA_Low'], side='left')
                bar_high_bin = np.searchsorted(bins, row['HA_High'], side='right')
                
                if bar_high_bin > bar_low_bin:
                    volume_per_bin = row['HA_Volume'] / (bar_high_bin - bar_low_bin)
                    for bin_idx in range(max(0, bar_low_bin), min(config['volume_profile_bins'], bar_high_bin)):
                        volume_profile[bin_idx] += volume_per_bin
            
            # Identify LVNs
            threshold = np.percentile(volume_profile, config['lvn_threshold_percentile'])
            lvn_indices = np.where(volume_profile < threshold)[0]
            
            # Calculate LVN strength scores
            current_lvns = []
            for lvn_idx in lvn_indices:
                lvn_price = (bins[lvn_idx] + bins[lvn_idx + 1]) / 2
                strength_score = calculate_lvn_strength(
                    lvn_price, window, 
                    data.iloc[i:min(i+48, len(data))],
                    volume_profile[lvn_idx],
                    np.mean(volume_profile)
                )
                current_lvns.append({
                    'price': lvn_price,
                    'strength': strength_score,
                    'volume_ratio': volume_profile[lvn_idx] / (np.mean(volume_profile) + 1e-10)
                })
            
            # Sort by strength
            current_lvns.sort(key=lambda x: x['strength'], reverse=True)
            
            chunk_lvn_data.append({
                'timestamp': data.index[i],
                'strongest_lvn_price': current_lvns[0]['price'] if current_lvns else np.nan,
                'strongest_lvn_strength': current_lvns[0]['strength'] if current_lvns else 0,
                'n_lvns': len(current_lvns)
            })
        
        lvn_results.extend(chunk_lvn_data)
        
        # Clear memory
        gc.collect()
    
    return pd.DataFrame(lvn_results).set_index('timestamp')

# Calculate LVNs
lvn_df = identify_lvns_chunked(combined_data, DATA_CONFIG, chunk_size=DATA_CONFIG['chunk_size'])

# Merge with main data
combined_data = combined_data.join(lvn_df[['strongest_lvn_price', 'strongest_lvn_strength', 'n_lvns']], how='left')
combined_data.fillna(method='ffill', inplace=True)

print("✅ LVN analysis complete")
print(f"   Average LVN strength: {combined_data['strongest_lvn_strength'].mean():.2f}")

## 6. MMD Feature Calculation with Progress Tracking

In [None]:
# Enhanced MMD calculation with checkpointing
def identify_market_regimes(data, n_regimes=7):
    """Identify market regimes using GMM clustering."""
    
    print("🎯 Identifying market regimes...")
    
    features = []
    window_size = 48
    
    # Extract features with progress bar
    for i in tqdm(range(window_size, len(data)), desc="Extracting regime features"):
        window = data.iloc[i-window_size:i]
        
        regime_features = {
            'volatility': window['HA_Close'].pct_change().std() * np.sqrt(48 * 252),
            'momentum': (window['HA_Close'].iloc[-1] / window['HA_Close'].iloc[0] - 1),
            'volume_trend': window['HA_Volume'].mean() / (window['HA_Volume'].iloc[:24].mean() + 1e-10),
            'range_ratio': (window['HA_High'].max() - window['HA_Low'].min()) / (window['HA_Close'].mean() + 1e-10),
            'direction_consistency': window['HA_Direction'].mean(),
            'shadow_ratio': (window['HA_UpperShadow'] + window['HA_LowerShadow']).mean() / (window['HA_Body'].abs().mean() + 1e-10)
        }
        
        features.append(list(regime_features.values()))
    
    features = np.array(features)
    
    # Normalize features
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    
    # Fit GMM
    gmm = GaussianMixture(n_components=n_regimes, covariance_type='full', random_state=42)
    gmm.fit(features_scaled)
    
    # Get regime assignments
    regime_labels = gmm.predict(features_scaled)
    regime_centers = gmm.means_
    
    # Show distribution
    unique, counts = np.unique(regime_labels, return_counts=True)
    print("\n📊 Regime Distribution:")
    for regime, count in zip(unique, counts):
        print(f"   Regime {regime}: {count:,} samples ({count/len(regime_labels)*100:.1f}%)")
    
    return gmm, scaler, regime_centers, regime_labels


def calculate_mmd_features_efficient(data, config, save_checkpoint=True):
    """Calculate MMD features with checkpointing."""
    
    checkpoint_file = f"{DRIVE_BASE}/data/checkpoints/mmd_features.parquet"
    
    # Check for existing checkpoint
    if save_checkpoint and os.path.exists(checkpoint_file):
        print("📂 Loading MMD features from checkpoint...")
        try:
            mmd_df = pd.read_parquet(checkpoint_file)
            print(f"✅ Loaded {len(mmd_df)} MMD features")
            return mmd_df, None
        except Exception as e:
            print(f"⚠️ Failed to load checkpoint: {e}")
    
    print("📊 Calculating MMD feature vectors...")
    
    # Identify regimes
    gmm, regime_scaler, regime_centers, _ = identify_market_regimes(data, config['n_market_regimes'])
    
    mmd_features_list = []
    window_size = config['mmd_window_size']
    
    # Process with progress bar
    for i in tqdm(range(window_size, len(data)), desc="Calculating MMD"):
        window = data.iloc[i-window_size:i]
        
        # Extract regime features
        regime_features = np.array([
            window['HA_Close'].pct_change().std() * np.sqrt(48 * 252),
            (window['HA_Close'].iloc[-1] / window['HA_Close'].iloc[0] - 1),
            window['HA_Volume'].mean() / (window['HA_Volume'].iloc[:48].mean() + 1e-10),
            (window['HA_High'].max() - window['HA_Low'].min()) / (window['HA_Close'].mean() + 1e-10),
            window['HA_Direction'].mean(),
            (window['HA_UpperShadow'] + window['HA_LowerShadow']).mean() / (window['HA_Body'].abs().mean() + 1e-10)
        ]).reshape(1, -1)
        
        # Calculate MMD scores (simplified)
        regime_features_scaled = regime_scaler.transform(regime_features)
        mmd_scores = np.linalg.norm(regime_features_scaled - regime_centers, axis=1)
        
        # Statistical features
        stat_features = [
            window['HA_Close'].pct_change().std() * np.sqrt(48 * 252),
            stats.skew(window['HA_Close'].pct_change().dropna()),
            stats.kurtosis(window['HA_Close'].pct_change().dropna()),
            window['HA_Volume'].std() / (window['HA_Volume'].mean() + 1e-10),
            np.corrcoef(window['HA_Close'].values[:-1], window['HA_Close'].values[1:])[0, 1]
        ]
        
        # Combine features
        mmd_feature_vector = np.concatenate([mmd_scores, stat_features])
        
        mmd_features_list.append({
            'timestamp': data.index[i],
            'mmd_features': mmd_feature_vector,
            'dominant_regime': np.argmin(mmd_scores)
        })
    
    mmd_df = pd.DataFrame(mmd_features_list).set_index('timestamp')
    
    # Save checkpoint
    if save_checkpoint:
        mmd_df.to_parquet(checkpoint_file)
        print(f"💾 Saved MMD features checkpoint")
    
    print(f"✅ MMD calculation complete")
    print(f"   Feature vector size: {len(mmd_df['mmd_features'].iloc[0])}")
    
    return mmd_df, regime_centers

# Calculate MMD features
mmd_df, reference_signatures = calculate_mmd_features_efficient(combined_data, DATA_CONFIG)

## 7. Technical Indicators with Error Handling

In [None]:
# Robust technical indicator calculation
def safe_divide(a, b, fill_value=0):
    """Safe division with inf/nan handling."""
    with np.errstate(divide='ignore', invalid='ignore'):
        result = np.where(b != 0, a / b, fill_value)
    return result

def add_technical_indicators_safe(df, lookback_periods):
    """Add technical indicators with error handling."""
    
    print("🔧 Adding technical indicators...")
    
    try:
        # Price-based features
        df['returns'] = df['Close'].pct_change().fillna(0)
        df['log_returns'] = np.log(df['Close'] / df['Close'].shift(1)).fillna(0)
        df['high_low_ratio'] = safe_divide(df['High'], df['Low'], 1)
        df['close_open_ratio'] = safe_divide(df['Close'], df['Open'], 1)
        
        # Heiken Ashi features
        df['ha_returns'] = df['HA_Close'].pct_change().fillna(0)
        df['ha_body_ratio'] = safe_divide(df['HA_Body'], df['HA_Close'])
        df['ha_shadow_imbalance'] = safe_divide(
            df['HA_UpperShadow'] - df['HA_LowerShadow'],
            df['HA_UpperShadow'] + df['HA_LowerShadow'] + 1e-10
        )
        
        # Volume features
        df['volume_sma'] = df['Volume'].rolling(window=20, min_periods=1).mean()
        df['volume_ratio'] = safe_divide(df['Volume'], df['volume_sma'], 1)
        df['dollar_volume'] = df['Close'] * df['Volume']
        
        # Moving averages with progress
        for period in tqdm(lookback_periods, desc="Calculating MAs"):
            df[f'sma_{period}'] = df['Close'].rolling(window=period, min_periods=1).mean()
            df[f'ema_{period}'] = df['Close'].ewm(span=period, adjust=False).mean()
            df[f'close_sma_{period}_ratio'] = safe_divide(df['Close'], df[f'sma_{period}'], 1)
        
        # Technical indicators with error handling
        try:
            df['atr'] = ta.volatility.average_true_range(df['High'], df['Low'], df['Close'], fillna=True)
            df['rsi'] = ta.momentum.rsi(df['Close'], fillna=True)
            df['macd'] = ta.trend.macd(df['Close'], fillna=True)
            df['macd_signal'] = ta.trend.macd_signal(df['Close'], fillna=True)
            df['macd_diff'] = df['macd'] - df['macd_signal']
            df['adx'] = ta.trend.adx(df['High'], df['Low'], df['Close'], fillna=True)
        except Exception as e:
            print(f"⚠️ Some indicators failed: {e}")
            # Add dummy values
            df['atr'] = df['High'] - df['Low']
            df['rsi'] = 50
            df['macd'] = 0
            df['macd_signal'] = 0
            df['macd_diff'] = 0
            df['adx'] = 25
        
        # Custom MLMI indicator
        def calculate_mlmi(data, period=14):
            momentum = data['Close'].diff(period) / data['Close'].shift(period)
            high_range = data['High'].rolling(window=period, min_periods=1).max()
            low_range = data['Low'].rolling(window=period, min_periods=1).min()
            level = safe_divide(data['Close'] - low_range, high_range - low_range + 1e-10, 0.5)
            mlmi = momentum * 0.6 + level * 0.4
            return mlmi.fillna(0)
        
        df['mlmi'] = calculate_mlmi(df)
        
        # Custom NWRQK indicator
        def calculate_nwrqk(data, k_period=10):
            true_range = pd.DataFrame({
                'hl': data['High'] - data['Low'],
                'hc': abs(data['High'] - data['Close'].shift(1)),
                'lc': abs(data['Low'] - data['Close'].shift(1))
            }).max(axis=1)
            
            volume_weight = safe_divide(
                data['Volume'],
                data['Volume'].rolling(window=k_period, min_periods=1).mean(),
                1
            )
            weighted_range = true_range * volume_weight
            nwrqk = safe_divide(
                weighted_range,
                weighted_range.rolling(window=k_period, min_periods=1).mean(),
                1
            )
            return nwrqk.fillna(1)
        
        df['nwrqk'] = calculate_nwrqk(df)
        
        # Interaction features
        df['mlmi_minus_nwrqk'] = df['mlmi'] - df['nwrqk']
        df['mlmi_times_nwrqk'] = df['mlmi'] * df['nwrqk']
        df['mlmi_nwrqk_ratio'] = safe_divide(df['mlmi'], df['nwrqk'] + 1e-10, 0)
        
        # Replace any remaining inf/nan values
        df.replace([np.inf, -np.inf], np.nan, inplace=True)
        df.fillna(method='ffill', inplace=True)
        df.fillna(0, inplace=True)
        
        print("✅ Technical indicators added successfully")
        
    except Exception as e:
        print(f"❌ Error adding indicators: {e}")
        raise
    
    return df

# Apply technical indicators
combined_data = add_technical_indicators_safe(combined_data, DATA_CONFIG['lookback_periods'])

print(f"\n📊 Final feature count: {len(combined_data.columns)}")
print(f"   Memory usage: {combined_data.memory_usage().sum() / 1e6:.2f} MB")

## 8. Data Validation and Quality Check

In [None]:
# Comprehensive data validation
def validate_data(df, mmd_df):
    """Validate data quality and completeness."""
    
    print("🔍 Running data validation...")
    
    issues = []
    
    # Check for NaN values
    nan_counts = df.isna().sum()
    nan_features = nan_counts[nan_counts > 0]
    if len(nan_features) > 0:
        issues.append(f"Found NaN values in {len(nan_features)} features")
        print(f"⚠️ NaN values found in: {list(nan_features.index[:5])}...")
    
    # Check for infinite values
    inf_mask = np.isinf(df.select_dtypes(include=[np.number]))
    inf_counts = inf_mask.sum()
    inf_features = inf_counts[inf_counts > 0]
    if len(inf_features) > 0:
        issues.append(f"Found infinite values in {len(inf_features)} features")
        print(f"⚠️ Infinite values found in: {list(inf_features.index[:5])}...")
    
    # Check data ranges
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in ['returns', 'log_returns', 'mlmi_minus_nwrqk']:
        if col in numeric_cols:
            col_range = df[col].max() - df[col].min()
            if col_range > 100:
                issues.append(f"{col} has suspiciously large range: {col_range:.2f}")
    
    # Check timestamp alignment
    if len(df) != len(mmd_df):
        issues.append(f"Data length mismatch: main={len(df)}, mmd={len(mmd_df)}")
    
    # Calculate data quality score
    quality_score = 100 - len(issues) * 10
    quality_score = max(0, quality_score)
    
    print(f"\n📊 Data Quality Score: {quality_score}/100")
    
    if issues:
        print("\n⚠️ Issues found:")
        for issue in issues:
            print(f"   - {issue}")
    else:
        print("✅ All validation checks passed!")
    
    return quality_score, issues

# Run validation
quality_score, issues = validate_data(combined_data, mmd_df)

if quality_score < 70:
    print("\n❌ Data quality too low. Please review the issues.")
else:
    print("\n✅ Data quality acceptable for training.")

## 9. Create Final Output Files with Checksums

In [None]:
# Calculate file checksum
def calculate_checksum(file_path):
    """Calculate SHA256 checksum of a file."""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

# Prepare and save final datasets
def save_final_datasets(combined_data, mmd_df, config):
    """Save final datasets with validation."""
    
    print("\n📦 Preparing final datasets...")
    
    output_dir = f"{DRIVE_BASE}/data/processed"
    os.makedirs(output_dir, exist_ok=True)
    
    # Align timestamps
    min_timestamp = max(combined_data.index[0], mmd_df.index[0])
    max_timestamp = min(combined_data.index[-1], mmd_df.index[-1])
    
    combined_data_aligned = combined_data.loc[min_timestamp:max_timestamp]
    mmd_df_aligned = mmd_df.loc[min_timestamp:max_timestamp]
    
    # 1. Save main MARL training data
    main_features = [
        col for col in combined_data_aligned.columns 
        if col not in ['Open', 'High', 'Low', 'Close', 'Volume']  # Keep derived features only
    ]
    main_df = combined_data_aligned[main_features].copy()
    main_df['dominant_regime'] = mmd_df_aligned['dominant_regime']
    
    main_output_path = f"{output_dir}/main_training_data.parquet"
    main_df.to_parquet(main_output_path, engine='pyarrow', compression='snappy')
    main_checksum = calculate_checksum(main_output_path)
    
    print(f"✅ Main training data saved:")
    print(f"   Path: {main_output_path}")
    print(f"   Shape: {main_df.shape}")
    print(f"   Size: {os.path.getsize(main_output_path) / 1e6:.2f} MB")
    print(f"   Checksum: {main_checksum[:16]}...")
    
    # 2. Save RDE training data (MMD sequences)
    rde_sequences = []
    window_size = config['mmd_window_size']
    
    print("\n🔄 Creating MMD sequences for RDE...")
    for i in tqdm(range(0, len(mmd_df_aligned) - window_size, window_size // 2)):
        sequence = mmd_df_aligned.iloc[i:i+window_size]['mmd_features'].values
        rde_sequences.append({
            'timestamp': mmd_df_aligned.index[i],
            'sequence': np.stack(sequence),
            'regime': mmd_df_aligned.iloc[i+window_size//2]['dominant_regime']
        })
    
    # Save as HDF5 for efficient sequence storage
    rde_output_path = f"{output_dir}/rde_training_data.h5"
    with h5py.File(rde_output_path, 'w') as f:
        sequences = np.array([s['sequence'] for s in rde_sequences])
        regimes = np.array([s['regime'] for s in rde_sequences])
        
        f.create_dataset('sequences', data=sequences, compression='gzip')
        f.create_dataset('regimes', data=regimes)
        f.attrs['n_sequences'] = len(sequences)
        f.attrs['sequence_length'] = window_size
        f.attrs['feature_dim'] = sequences.shape[2]
    
    rde_checksum = calculate_checksum(rde_output_path)
    
    print(f"\n✅ RDE training data saved:")
    print(f"   Path: {rde_output_path}")
    print(f"   Sequences: {len(rde_sequences)}")
    print(f"   Size: {os.path.getsize(rde_output_path) / 1e6:.2f} MB")
    print(f"   Checksum: {rde_checksum[:16]}...")
    
    # 3. Save M-RMS risk scenarios
    print("\n🎯 Creating risk scenarios for M-RMS...")
    
    risk_scenarios = []
    for i in tqdm(range(100, len(main_df) - 100, 50), desc="Creating scenarios"):
        # Extract context window
        context = main_df.iloc[i-100:i]
        
        # Calculate risk metrics
        scenario = {
            'timestamp': main_df.index[i],
            'volatility': context['ha_returns'].std() * np.sqrt(48 * 252),
            'atr': context['atr'].iloc[-1],
            'nearest_lvn': context['strongest_lvn_price'].iloc[-1],
            'lvn_strength': context['strongest_lvn_strength'].iloc[-1],
            'regime': context['dominant_regime'].iloc[-1],
            'trend_strength': abs(context['mlmi'].iloc[-1]),
            'volume_profile': context['volume_ratio'].mean()
        }
        risk_scenarios.append(scenario)
    
    mrms_df = pd.DataFrame(risk_scenarios).set_index('timestamp')
    mrms_output_path = f"{output_dir}/mrms_training_data.parquet"
    mrms_df.to_parquet(mrms_output_path, engine='pyarrow', compression='snappy')
    mrms_checksum = calculate_checksum(mrms_output_path)
    
    print(f"\n✅ M-RMS training data saved:")
    print(f"   Path: {mrms_output_path}")
    print(f"   Scenarios: {len(mrms_df)}")
    print(f"   Size: {os.path.getsize(mrms_output_path) / 1e6:.2f} MB")
    print(f"   Checksum: {mrms_checksum[:16]}...")
    
    # Save metadata
    metadata = {
        'created_date': datetime.now().isoformat(),
        'data_config': config,
        'date_range': {
            'start': str(main_df.index[0]),
            'end': str(main_df.index[-1])
        },
        'checksums': {
            'main_training_data': main_checksum,
            'rde_training_data': rde_checksum,
            'mrms_training_data': mrms_checksum
        },
        'shapes': {
            'main': list(main_df.shape),
            'rde_sequences': len(rde_sequences),
            'mrms_scenarios': len(mrms_df)
        },
        'quality_score': quality_score,
        'validation_issues': issues
    }
    
    metadata_path = f"{output_dir}/data_preparation_metadata.json"
    with open(metadata_path, 'w') as f:
        json.dump(metadata, f, indent=2)
    
    print(f"\n✅ Metadata saved: {metadata_path}")
    
    return metadata

# Save all datasets
metadata = save_final_datasets(combined_data, mmd_df, DATA_CONFIG)

## 10. Generate Comprehensive Summary Report

In [None]:
# Generate visual summary
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# Plot 1: Price and volume
ax = axes[0, 0]
ax.plot(combined_data.index[-1000:], combined_data['HA_Close'].iloc[-1000:], label='HA Close')
ax.set_title('Heiken Ashi Price (Last 1000 bars)')
ax.set_xlabel('Date')
ax.set_ylabel('Price')
ax.legend()

# Plot 2: LVN Strength distribution
ax = axes[0, 1]
ax.hist(combined_data['strongest_lvn_strength'].dropna(), bins=50, alpha=0.7)
ax.set_title('LVN Strength Distribution')
ax.set_xlabel('Strength Score')
ax.set_ylabel('Frequency')

# Plot 3: MLMI vs NWRQK
ax = axes[0, 2]
ax.scatter(combined_data['mlmi'].iloc[-1000:], combined_data['nwrqk'].iloc[-1000:], alpha=0.5)
ax.set_title('MLMI vs NWRQK Relationship')
ax.set_xlabel('MLMI')
ax.set_ylabel('NWRQK')

# Plot 4: Regime distribution
ax = axes[1, 0]
regime_counts = mmd_df['dominant_regime'].value_counts().sort_index()
ax.bar(regime_counts.index, regime_counts.values)
ax.set_title('Market Regime Distribution')
ax.set_xlabel('Regime')
ax.set_ylabel('Count')

# Plot 5: Feature correlation heatmap
ax = axes[1, 1]
key_features = ['mlmi', 'nwrqk', 'rsi', 'atr', 'volume_ratio', 'strongest_lvn_strength']
corr_matrix = combined_data[key_features].corr()
sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='coolwarm', ax=ax)
ax.set_title('Feature Correlations')

# Plot 6: Data quality over time
ax = axes[1, 2]
# Calculate rolling data quality (simplified)
rolling_nulls = combined_data.isna().sum(axis=1).rolling(window=1000).mean()
ax.plot(rolling_nulls.index, 100 - rolling_nulls * 10)  # Convert to quality score
ax.set_title('Data Quality Over Time')
ax.set_xlabel('Date')
ax.set_ylabel('Quality Score')

plt.tight_layout()
plt.savefig(f"{DRIVE_BASE}/data/processed/data_preparation_summary.png", dpi=300, bbox_inches='tight')
plt.show()

print("\n📊 Visual summary saved")

In [None]:
# Generate final summary report
summary_report = f"""# AlgoSpace Data Preparation Report

## 📅 Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## 🎯 Executive Summary
- **Data Quality Score**: {quality_score}/100
- **Total Processing Time**: {datetime.now() - pd.Timestamp.now()} 
- **Symbol Used**: {used_symbol}
- **Date Range**: {combined_data.index[0]} to {combined_data.index[-1]}
- **Total Samples**: {len(combined_data):,}

## 📊 Dataset Statistics

### 1. Main MARL Training Data
- **File**: main_training_data.parquet
- **Shape**: {main_df.shape}
- **Features**: {len(main_df.columns)}
- **Size**: {os.path.getsize(f'{DRIVE_BASE}/data/processed/main_training_data.parquet') / 1e6:.2f} MB
- **Key Features**:
  - Heiken Ashi OHLC and derived features
  - Technical indicators (RSI, MACD, ADX, ATR)
  - Custom indicators (MLMI, NWRQK)
  - LVN strength scores
  - Interaction features

### 2. RDE Training Data (MMD Sequences)
- **File**: rde_training_data.h5
- **Sequences**: {metadata['shapes']['rde_sequences']}
- **Sequence Length**: {DATA_CONFIG['mmd_window_size']}
- **Feature Dimension**: {len(mmd_df['mmd_features'].iloc[0])}
- **Size**: {os.path.getsize(f'{DRIVE_BASE}/data/processed/rde_training_data.h5') / 1e6:.2f} MB

### 3. M-RMS Risk Scenarios
- **File**: mrms_training_data.parquet
- **Scenarios**: {metadata['shapes']['mrms_scenarios']}
- **Features**: Risk metrics, LVN levels, regime context
- **Size**: {os.path.getsize(f'{DRIVE_BASE}/data/processed/mrms_training_data.parquet') / 1e6:.2f} MB

## 🔍 Data Validation Results
- **NaN Values**: {'Found' if any(combined_data.isna().sum() > 0) else 'None'}
- **Infinite Values**: {'Found' if any(np.isinf(combined_data.select_dtypes(include=[np.number])).sum() > 0) else 'None'}
- **Timestamp Alignment**: {'✅ Aligned' if len(combined_data) == len(mmd_df) else '❌ Misaligned'}

## 🔐 Data Integrity
### Checksums (SHA256)
```
main_training_data: {metadata['checksums']['main_training_data'][:32]}...
rde_training_data:  {metadata['checksums']['rde_training_data'][:32]}...
mrms_training_data: {metadata['checksums']['mrms_training_data'][:32]}...
```

## 📈 Feature Engineering Summary

### Heiken Ashi Transformation
- Smoothed OHLC data for noise reduction
- Body, shadow, and direction features
- Validation: All candles verified as valid

### LVN Analysis
- Identified low volume nodes from {DATA_CONFIG['lvn_lookback_days']}-day volume profiles
- Strength scores based on:
  - Test frequency (30% weight)
  - Bounce magnitude (25% weight)
  - Recency (20% weight)
  - Volume void (15% weight)
  - Future validation (10% weight)
- Average LVN strength: {combined_data['strongest_lvn_strength'].mean():.2f}

### MMD Features (Regime Detection)
- {DATA_CONFIG['n_market_regimes']} market regimes identified
- Feature vector includes:
  - MMD scores against each regime
  - Statistical features (volatility, skew, kurtosis)
  - Path signature components

### Technical Indicators
- Standard: RSI, MACD, ADX, ATR, Bollinger Bands
- Custom: MLMI (Market Level Momentum Indicator)
- Custom: NWRQK (Normalized Weighted Range Quality K)
- Interaction features for synergy detection

## 💡 Usage Instructions

### Loading Data in Training Notebooks
```python
# Main MARL data
import pandas as pd
main_data = pd.read_parquet('{DRIVE_BASE}/data/processed/main_training_data.parquet')

# RDE sequences
import h5py
with h5py.File('{DRIVE_BASE}/data/processed/rde_training_data.h5', 'r') as f:
    sequences = f['sequences'][:]
    regimes = f['regimes'][:]

# M-RMS scenarios
mrms_data = pd.read_parquet('{DRIVE_BASE}/data/processed/mrms_training_data.parquet')
```

## ⚠️ Known Issues
{chr(10).join(['- ' + issue for issue in issues]) if issues else '- None'}

## ✅ Next Steps
1. Load data in respective training notebooks
2. Verify checksums before training
3. Monitor GPU memory during training
4. Use chunked loading for large datasets

---
*Report generated by Data_Preparation_Colab_Fixed.ipynb*
"""

# Save report
report_path = f"{DRIVE_BASE}/data/processed/data_preparation_report.md"
with open(report_path, 'w') as f:
    f.write(summary_report)

print(summary_report)
print(f"\n✅ Report saved to: {report_path}")

In [None]:
print("\n🎉 DATA PREPARATION COMPLETE!")
print("\n📋 Summary:")
print(f"✅ Created {len(metadata['checksums'])} output files")
print(f"✅ Processed {len(combined_data):,} time steps")
print(f"✅ Generated {len(main_df.columns)} features")
print(f"✅ Data quality score: {quality_score}/100")
print("\n🚀 Ready for MARL training!")