# Cryptocurrency Volatility Forecasting Toolkit
## Advanced Machine Learning Pipeline with TSFresh Feature Engineering

This notebook demonstrates a sophisticated cryptocurrency volatility forecasting pipeline that combines:

- **Multi-Source Data Collection**: CoinGecko, Binance, Dune Analytics, FRED, Deribit
- **Advanced Feature Engineering**: TSFresh time series feature extraction with Dask distributed computing  
- **Professional ML Pipeline**: XGBoost with Optuna hyperparameter optimization
- **Comprehensive Analysis**: Technical indicators, on-chain metrics, and macroeconomic data

### Key Components

1. **Data Collection & Integration**: Unified cryptocurrency, on-chain, and macro data
2. **Feature Engineering**: TSFresh rolling time series feature extraction (600+ features)
3. **Feature Selection**: Statistical significance testing with FDR control
4. **Distributed Computing**: Dask cluster for scalable computation
5. **Model Training**: XGBoost with Optuna hyperparameter optimization
6. **Evaluation & Visualization**: Comprehensive performance metrics and plots

### Target: Realized Volatility Forecasting
Predicting next-period realized volatility for cryptocurrency returns using advanced time series features and multi-modal data sources.

## 1. Environment Setup & Configuration

Setting up the environment with all required libraries, configurations, and Dask distributed computing cluster.

## Basic Setup

In [11]:
# Core Libraries & Environment Setup
import os
import sys
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import time
import logging

# Suppress common warnings for cleaner output
warnings.filterwarnings('ignore')
warnings.filterwarnings('ignore', category=UserWarning, module='tsfresh')
logging.getLogger('tsfresh').setLevel(logging.ERROR)

# Distributed Computing
import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, progress
from dask import delayed

# TSFresh Feature Engineering
from tsfresh import extract_features, select_features, extract_relevant_features
from tsfresh.feature_extraction import ComprehensiveFCParameters, EfficientFCParameters, MinimalFCParameters
from tsfresh.utilities.dataframe_functions import roll_time_series, impute
from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk

# Machine Learning & Optimization
import xgboost as xgb
from xgboost import dask as dxgb
import optuna
from optuna.integration.dask import DaskStorage
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from sklearn.model_selection import TimeSeriesSplit

# Technical Analysis
try:
    import talib
    TALIB_AVAILABLE = True
except ImportError:
    TALIB_AVAILABLE = False
    print("⚠️  TA-Lib not available - technical indicators will be skipped")

# Configuration
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (15, 8)

# Add src directory to path for imports
notebook_dir = os.getcwd()
repo_root = os.path.dirname(notebook_dir)
src_path = os.path.join(repo_root, 'src')
if src_path not in sys.path:
    sys.path.insert(0, src_path)

print("🔧 Environment setup completed")
print(f"📁 Working directory: {notebook_dir}")
print(f"🔗 Repository root: {repo_root}")

# Import project modules
from data.collectors import CryptoDataCollector
from config import Config

print("✅ All imports successful - ready for pipeline execution!")

🔧 Environment setup completed
📁 Working directory: c:\CryptoMarketForecasting-new\v2-volatility-forecasting\notebooks
🔗 Repository root: c:\CryptoMarketForecasting-new\v2-volatility-forecasting
✅ All imports successful - ready for pipeline execution!


## 2. Configuration & Constants

Setting up key pipeline parameters and configurations for data collection, feature engineering, and model training.

In [12]:
# =============================================================================
# PIPELINE CONFIGURATION & CONSTANTS
# =============================================================================

# Data Collection Parameters
TARGET_COIN = "ethereum"           # Primary target for volatility forecasting
BASE_FIAT = "usd"                 # Base currency for all prices
TOP_N = 10                        # Number of top cryptocurrencies by market cap
LOOKBACK_DAYS = 365               # Historical data window
FREQUENCY = "1D"                  # Data frequency (1D = daily)
TIMEZONE = "Europe/Madrid"        # Timezone for data alignment
SLEEP_TIME = 6                    # API rate limiting delay (seconds)

# Feature Engineering Parameters  
TIME_WINDOW = 14                  # Rolling window for TSFresh feature extraction
EXTRACTION_SETTINGS = EfficientFCParameters()  # TSFresh feature extraction parameters
DEFAULT_FDR_LEVEL = 0.05          # False Discovery Rate for feature selection

# Model Training Parameters
RANDOM_SEED = 42                  # For reproducibility
SPLITS = 10                       # Time series cross-validation splits
DEFAULT_N_TRIALS = 100            # Optuna hyperparameter optimization trials
DEFAULT_N_ROUNDS = 200            # XGBoost training rounds
DEFAULT_XGB_METRIC = 'mae'        # XGBoost evaluation metric
DEFAULT_TREE_METHOD = 'hist'      # XGBoost tree construction method
DEFAULT_EARLY_STOPPING = 25       # Early stopping patience

# Date Calculations
START_DATE = (datetime.now() - timedelta(days=LOOKBACK_DAYS)).strftime("%Y-%m-%d")
TODAY = datetime.now().strftime('%Y-%m-%d')

print("📊 PIPELINE CONFIGURATION")
print("=" * 50)
print(f"🎯 Target Cryptocurrency: {TARGET_COIN.upper()}")
print(f"📅 Data Range: {START_DATE} to {TODAY} ({LOOKBACK_DAYS} days)")
print(f"🔄 Frequency: {FREQUENCY}")
print(f"🏆 Top Cryptocurrencies: {TOP_N}")
print(f"🪟 Rolling Window: {TIME_WINDOW} periods")
print(f"🧪 Optuna Trials: {DEFAULT_N_TRIALS}")
print(f"🌳 XGBoost Rounds: {DEFAULT_N_ROUNDS}")
print(f"📈 Evaluation Metric: {DEFAULT_XGB_METRIC.upper()}")

# API Keys verification
api_keys = ['COINGECKO_API_KEY', 'DUNE_API_KEY', 'FRED_API_KEY']
available_keys = [k for k in api_keys if os.getenv(k)]
print(f"\n✅ Configuration loaded successfully")
print(f"🔑 API Keys configured: {len(available_keys)}/3 ({', '.join(available_keys)})")

# Set numpy random seed for reproducibility
np.random.seed(RANDOM_SEED)
print(f"🎲 Random seed set to {RANDOM_SEED} for reproducibility")

📊 PIPELINE CONFIGURATION
🎯 Target Cryptocurrency: ETHEREUM
📅 Data Range: 2024-10-02 to 2025-10-02 (365 days)
🔄 Frequency: 1D
🏆 Top Cryptocurrencies: 10
🪟 Rolling Window: 14 periods
🧪 Optuna Trials: 100
🌳 XGBoost Rounds: 200
📈 Evaluation Metric: MAE

✅ Configuration loaded successfully
🔑 API Keys configured: 3/3 (COINGECKO_API_KEY, DUNE_API_KEY, FRED_API_KEY)
🎲 Random seed set to 42 for reproducibility


## 3. Dask Distributed Computing Setup

Initializing local Dask cluster for distributed computation. This enables parallel processing of TSFresh feature extraction and XGBoost training across multiple CPU cores.

In [19]:
# =============================================================================
# DASK DISTRIBUTED COMPUTING CLUSTER SETUP
# =============================================================================

import logging
import warnings

# Suppress Dask logging to reduce duplicated output
logging.getLogger('distributed').setLevel(logging.ERROR)
logging.getLogger('distributed.scheduler').setLevel(logging.ERROR)
logging.getLogger('distributed.nanny').setLevel(logging.ERROR)
logging.getLogger('distributed.core').setLevel(logging.ERROR)
logging.getLogger('distributed.diskutils').setLevel(logging.ERROR)
warnings.filterwarnings('ignore', category=UserWarning, module='distributed')

# Clean up any existing clusters
try:
    if 'client' in globals() and client:
        print("🧹 Closing existing Dask client...")
        client.close()
    if 'cluster' in globals() and cluster:
        print("🧹 Closing existing Dask cluster...")
        cluster.close()
except:
    pass

# Initialize optimized local cluster for cryptocurrency analysis
print("🚀 Initializing Dask LocalCluster...")

cluster = LocalCluster(
    n_workers=4,                    # Number of worker processes
    threads_per_worker=4,           # Threads per worker (adjust based on CPU cores)
    processes=True,                 # Use processes for better parallelization
    memory_limit='6GB',             # Memory limit per worker
    dashboard_address=':8787',      # Dashboard port
    silence_logs=True,             # Keep logs quiet
    protocol='tcp'                  # Communication protocol
)

client = Client(cluster)

print("✅ Dask cluster initialized successfully!")
print(f"📊 Dashboard: http://localhost:8787")
print(f"👥 Workers: {len(client.scheduler_info()['workers'])}")
print(f"🧵 Total threads: {sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())}")
print(f"💾 Total memory: {sum(w['memory_limit'] for w in client.scheduler_info()['workers'].values()) / 1e9:.1f} GB")

# Display cluster information
client

🧹 Closing existing Dask client...
🧹 Closing existing Dask cluster...
🚀 Initializing Dask LocalCluster...
🚀 Initializing Dask LocalCluster...
✅ Dask cluster initialized successfully!
📊 Dashboard: http://localhost:8787
👥 Workers: 4
🧵 Total threads: 16
💾 Total memory: 24.0 GB
✅ Dask cluster initialized successfully!
📊 Dashboard: http://localhost:8787
👥 Workers: 4
🧵 Total threads: 16
💾 Total memory: 24.0 GB


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 22.35 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54983,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:55003,Total threads: 4
Dashboard: http://127.0.0.1:55008/status,Memory: 5.59 GiB
Nanny: tcp://127.0.0.1:54986,
Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-r_9j_jvc,Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-r_9j_jvc

0,1
Comm: tcp://127.0.0.1:55002,Total threads: 4
Dashboard: http://127.0.0.1:55006/status,Memory: 5.59 GiB
Nanny: tcp://127.0.0.1:54988,
Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-sogtm105,Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-sogtm105

0,1
Comm: tcp://127.0.0.1:55005,Total threads: 4
Dashboard: http://127.0.0.1:55010/status,Memory: 5.59 GiB
Nanny: tcp://127.0.0.1:54990,
Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-vvohzo_j,Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-vvohzo_j

0,1
Comm: tcp://127.0.0.1:55004,Total threads: 4
Dashboard: http://127.0.0.1:55011/status,Memory: 5.59 GiB
Nanny: tcp://127.0.0.1:54992,
Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-zl0o98id,Local directory: C:\Users\amali\AppData\Local\Temp\dask-scratch-space\worker-zl0o98id


## 4. Multi-Source Data Collection

Collecting comprehensive datasets from multiple sources:

- **Cryptocurrency Data**: Price, volume, market cap (CoinGecko, Binance)
- **On-Chain Analytics**: DeFi metrics, network activity (Dune Analytics) 
- **Derivatives Data**: Implied volatility indices (Deribit DVOL)
- **Macroeconomic Data**: Interest rates, volatility indices (FRED)

Each source provides unique insights into cryptocurrency market dynamics.

In [14]:
# =============================================================================
# MULTI-SOURCE DATA COLLECTION PIPELINE (CLEAN & ALIGNED)
# =============================================================================

print("🌐 STARTING COMPREHENSIVE DATA COLLECTION")
print("=" * 60)

# Initialize data collector with safe settings
collector = CryptoDataCollector(
    timezone=TIMEZONE,
    top_n=TOP_N,
    lookback_days=LOOKBACK_DAYS,
    frequency=FREQUENCY,
    dune_strategy="cached_only",  # Safe: only use cached results
    allow_dune_execution=False    # Protection: never consume credits
)

print(f"📡 Data Collector initialized:")
print(f"   • Frequency: {collector.FREQUENCY}")
print(f"   • Lookback: {LOOKBACK_DAYS} days")
print(f"   • Top cryptocurrencies: {TOP_N}")
print(f"   • Timezone: {TIMEZONE}")
print(f"   • Credit Protection: ENABLED (cached_only)")

# 1. CRYPTOCURRENCY UNIVERSE & PRICE DATA
print(f"\n💰 1. COLLECTING CRYPTOCURRENCY DATA")
print("-" * 40)

# Get top cryptocurrency universe  
universe_data = collector.coingecko_get_universe(n=TOP_N, output_format="both")
if isinstance(universe_data, dict):
    top_coins = universe_data['ids']
    print(f"✅ Top {TOP_N} cryptocurrencies by market cap")
    print(f"   • {', '.join(top_coins[:5])}{'...' if TOP_N > 5 else ''}")
else:
    print("❌ Failed to get universe data")
    top_coins = ['bitcoin', 'ethereum']  # Fallback

# Collect comprehensive price data
print(f"\n📈 Collecting price data...")
price_data = collector.coingecko_get_price_action(top_coins, sleep_time=SLEEP_TIME)

if not price_data.empty:
    print(f"✅ Price data collected: {price_data.shape}")
    print(f"   • Date range: {price_data.index.min().date()} to {price_data.index.max().date()}")
    print(f"   • Features: {price_data.shape[1]} across {len(top_coins)} assets")
else:
    print("❌ No price data collected")

# 2. ON-CHAIN ANALYTICS DATA (SAFE APPROACH)
print(f"\n⛓️  2. COLLECTING ON-CHAIN ANALYTICS DATA")
print("-" * 50)

# Use the direct approach with clean output
try:
    print("🔒 Accessing cached Dune results (NO CREDITS)")
    
    # Suppress verbose logging
    import logging
    dune_logger = logging.getLogger('dune_client')
    original_level = dune_logger.level
    dune_logger.setLevel(logging.ERROR)
    
    dune_data = collector.get_dune_data_direct()
    
    # Restore logging
    dune_logger.setLevel(original_level)
    
    if not dune_data.empty:
        print(f"✅ On-chain data: {dune_data.shape}")
        
        # Show which datasets were successfully retrieved
        dataset_names = []
        for col in dune_data.columns:
            if '_' in col:
                dataset_name = col.split('_')[0]
                if dataset_name not in dataset_names:
                    dataset_names.append(dataset_name)
        
        print(f"   • Successfully retrieved: {', '.join(dataset_names[:3])}{'...' if len(dataset_names) > 3 else ''}")
        print(f"   • Total datasets: {len(dataset_names)} from cached queries")
        print(f"💰 Credits used: 0 (cached results only)")
    else:
        print("ℹ️  No cached on-chain data available")
        print("   • This is expected - queries may not have recent cached results")
        dune_data = pd.DataFrame()
        
except Exception as e:
    print(f"⚠️  On-chain data collection completed with limitations")
    dune_data = pd.DataFrame()

# 3. DERIVATIVES DATA
print(f"\n📊 3. COLLECTING DERIVATIVES DATA")
print("-" * 40)

try:
    dvol_data = collector.deribit_get_dvol(currencies=['BTC', 'ETH'])
    if not dvol_data.empty:
        print(f"✅ DVOL data: {dvol_data.shape}")
        print(f"   • Implied volatility indices for BTC and ETH")
    else:
        print("⚠️  No DVOL data available")
        dvol_data = pd.DataFrame()
except Exception as e:
    print(f"⚠️  DVOL collection completed with limitations")
    dvol_data = pd.DataFrame()

# 4. MACROECONOMIC DATA
print(f"\n🏛️  4. COLLECTING MACROECONOMIC DATA")
print("-" * 40)

try:
    fred_data = collector.fred_get_series()
    if not fred_data.empty:
        print(f"✅ FRED data: {fred_data.shape}")
        print(f"   • Macroeconomic indicators from Federal Reserve")
    else:
        print("⚠️  No FRED data available")
        fred_data = pd.DataFrame()
except Exception as e:
    print(f"⚠️  FRED collection completed with limitations")
    fred_data = pd.DataFrame()

# Final Summary
print(f"\n🎉 DATA COLLECTION COMPLETED!")
print("=" * 40)

# Calculate total features available
total_features = 0
datasets_available = 0
if not price_data.empty:
    total_features += price_data.shape[1]
    datasets_available += 1
if not dune_data.empty:
    total_features += dune_data.shape[1]
    datasets_available += 1
if not dvol_data.empty:
    total_features += dvol_data.shape[1]
    datasets_available += 1
if not fred_data.empty:
    total_features += fred_data.shape[1]
    datasets_available += 1

print(f"📊 Final Dataset Summary:")
print(f"   • Price data: {price_data.shape if not price_data.empty else 'None'}")  
print(f"   • On-chain data: {dune_data.shape if not dune_data.empty else 'None'}")
print(f"   • DVOL data: {dvol_data.shape if not dvol_data.empty else 'None'}")
print(f"   • FRED data: {fred_data.shape if not fred_data.empty else 'None'}")
print(f"   • Total features: {total_features} across {datasets_available} data sources")
print(f"💰 Total Dune credits used: 0 (cached results only)")
print("=" * 60)

🌐 STARTING COMPREHENSIVE DATA COLLECTION
📡 Data Collector initialized:
   • Frequency: 1D
   • Lookback: 365 days
   • Top cryptocurrencies: 10
   • Timezone: Europe/Madrid
   • Credit Protection: ENABLED (cached_only)

💰 1. COLLECTING CRYPTOCURRENCY DATA
----------------------------------------


✅ Top 10 cryptocurrencies by market cap
   • bitcoin, ethereum, ripple, tether, binancecoin...

📈 Collecting price data...
⚠️  CoinGecko: 7 failures
✅ Price data collected: (366, 9)
   • Date range: 2024-10-02 to 2025-10-02
   • Features: 9 across 10 assets

⛓️  2. COLLECTING ON-CHAIN ANALYTICS DATA
--------------------------------------------------
🔒 Accessing cached Dune results (NO CREDITS)
🔍 Checking 25 queries for cached results...
⚠️  CoinGecko: 7 failures
✅ Price data collected: (366, 9)
   • Date range: 2024-10-02 to 2025-10-02
   • Features: 9 across 10 assets

⛓️  2. COLLECTING ON-CHAIN ANALYTICS DATA
--------------------------------------------------
🔒 Accessing cached Dune results (NO CREDITS)
🔍 Checking 25 queries for cached results...
   ✅ Query 5893929: 2102 rows (cached)
   ✅ Query 5893929: 2102 rows (cached)
   ✅ Query 5893461: 1827 rows (cached)
   ✅ Query 5893952: 180 rows (cached)
   ✅ Query 5893461: 1827 rows (cached)
   ✅ Query 5893952: 180 rows (cached)
   ⚠️  Qu

In [21]:
dune_data.columns

Index(['query_5893929_btc_block_time', 'query_5893929_date',
       'query_5893461_close_price_usd', 'query_5893461_day',
       'query_5893461_fees_usd', 'query_5893461_total_fees_btc',
       'query_5893461_txn_count', 'query_5893461_txn_volume_btc',
       'query_5893952_base_fee_usd', 'query_5893952_blob_sumbmission_fee_usd',
       'query_5893952_date', 'query_5893952_mev_tips_usd',
       'query_5893952_priority_fee_usd', 'query_5894076_daily_funding_fr',
       'query_5894076_date', 'query_5894076_yearly_fr_pct',
       'query_5894076_yearly_funding_fr', 'query_5893557_daily_burn',
       'query_5893557_time', 'query_5893307_daily_active_addresses_L2s',
       'query_5893307_date', 'query_5894092_date',
       'query_5894092_total_margin_usd', 'query_5894092_total_revenue_usd',
       'query_5894035_date', 'query_5894035_gas_fees_usd',
       'query_5893555_date', 'query_5893555_sum_txns', 'query_5893552_day',
       'query_5893552_users', 'query_5893566_daily_burn', 'query_5893

In [22]:
# Re-collect Dune data with the updated method (no prefixes)
print("🔄 Re-collecting Dune data with updated method...")

# Create a new collector instance to ensure fresh collection
fresh_collector = CryptoDataCollector(
    timezone=TIMEZONE,
    top_n=TOP_N,
    lookback_days=LOOKBACK_DAYS,
    frequency=FREQUENCY,
    dune_strategy="cached_only",
    allow_dune_execution=False
)

# Suppress logging for clean output
import logging
dune_logger = logging.getLogger('dune_client')
original_level = dune_logger.level
dune_logger.setLevel(logging.ERROR)

# Get fresh data without prefixes
dune_data_fresh = fresh_collector.get_dune_data_direct()

# Restore logging
dune_logger.setLevel(original_level)

if not dune_data_fresh.empty:
    print(f"✅ Fresh Dune data collected: {dune_data_fresh.shape}")
    print(f"📊 Sample columns (no prefixes): {list(dune_data_fresh.columns[:5])}")
    # Replace the old data
    dune_data = dune_data_fresh
else:
    print("⚠️  No fresh data available")

print(f"\n🔍 Updated column names:")
print(dune_data.columns.tolist()[:10])  # Show first 10 columns

🔄 Re-collecting Dune data with updated method...
🔍 Checking 25 queries for cached results...
   ✅ Query 5893929: 2102 rows (cached)
   ✅ Query 5893929: 2102 rows (cached)
   ✅ Query 5893461: 1827 rows (cached)
   ✅ Query 5893952: 180 rows (cached)
   ✅ Query 5893461: 1827 rows (cached)
   ✅ Query 5893952: 180 rows (cached)
   ⚠️  Query 5893947: No cached result
   ✅ Query 5894076: 181 rows (cached)
   ⚠️  Query 5893947: No cached result
   ✅ Query 5894076: 181 rows (cached)
   ✅ Query 5893557: 1827 rows (cached)
   ✅ Query 5893307: 1586 rows (cached)
   ✅ Query 5893557: 1827 rows (cached)
   ✅ Query 5893307: 1586 rows (cached)
   ✅ Query 5894092: 1695 rows (cached)
   ✅ Query 5894035: 1827 rows (cached)
   ✅ Query 5894092: 1695 rows (cached)
   ✅ Query 5894035: 1827 rows (cached)
   ✅ Query 5893555: 1005 rows (cached)
   ✅ Query 5893552: 2830 rows (cached)
   ✅ Query 5893555: 1005 rows (cached)
   ✅ Query 5893552: 2830 rows (cached)
   ✅ Query 5893566: 1005 rows (cached)
   ❌ Query 589

## 🛡️ Enhanced Credit Protection & Data Collection Status

This section implements the lessons learned about safe Dune API usage:

### 🎯 **Key Improvements:**
1. **Credit Protection**: Multiple safeguards prevent accidental Dune credit consumption
2. **Direct API Access**: Clean, simple approach using `dune.get_latest_result()`
3. **Cached Results**: FREE access to previously executed queries
4. **Fallback Strategies**: Graceful degradation when data unavailable

### 💰 **Cost Structure:**
- **Cached Results**: FREE (using `get_latest_result()` on previously executed queries)
- **Fresh Execution**: ~6 credits per query (only when explicitly enabled)
- **Other APIs**: CoinGecko, FRED, Deribit remain unchanged

### ⚠️ **Expected API Behavior:**
- **402 Payment Required**: Normal response when no cached results exist and execution is disabled
- **Connection Errors**: Temporary network issues, system will continue with available data
- **Partial Data**: System designed to work with whatever data sources are available

### 🔧 **Configuration:**
- `dune_strategy="cached_only"`: Safe default, only accesses cached results
- `allow_dune_execution=False`: Critical protection flag
- Direct client approach: Simplest and most reliable method

The system is working as designed - protecting your credits while utilizing any available cached data!

## 5. Data Integration & Unified Dataset Construction

Combining all data sources into a unified dataset with proper temporal alignment and handling of missing values.

In [None]:
# =============================================================================
# UNIFIED DATASET CONSTRUCTION & DATA INTEGRATION
# =============================================================================

print("🔗 INTEGRATING MULTI-SOURCE DATASETS")
print("=" * 50)

# Collect all available datasets
datasets = {
    'price_data': price_data,
    'dune_data': dune_data, 
    'dvol_data': dvol_data,
    'fred_data': fred_data
}

# Filter non-empty datasets
available_datasets = {name: df for name, df in datasets.items() if len(df) > 0}
print(f"📊 Available datasets: {list(available_datasets.keys())}")

# Temporal alignment and integration
unified_data = None
integration_stats = {}

for name, df in available_datasets.items():
    try:
        # Ensure proper timezone handling
        if df.index.tz is not None:
            df_aligned = df.copy()
        else:
            df_aligned = df.copy()
            df_aligned.index = pd.DatetimeIndex(df_aligned.index).tz_localize(TIMEZONE)
        
        # Convert to date for daily alignment
        df_aligned.index = df_aligned.index.tz_convert(TIMEZONE).date
        
        # Integrate with main dataset
        if unified_data is None:
            unified_data = df_aligned
            integration_stats[name] = {'shape': df_aligned.shape, 'status': 'primary'}
        else:
            before_shape = unified_data.shape
            unified_data = unified_data.join(df_aligned, how='outer')
            after_shape = unified_data.shape
            integration_stats[name] = {
                'shape': df_aligned.shape, 
                'added_cols': after_shape[1] - before_shape[1],
                'status': 'integrated'
            }
        
        print(f"✅ {name}: {df_aligned.shape} -> Added {integration_stats[name].get('added_cols', df_aligned.shape[1])} columns")
        
    except Exception as e:
        print(f"⚠️  Failed to integrate {name}: {e}")
        integration_stats[name] = {'status': 'failed', 'error': str(e)}

# Dataset quality assessment
print(f"\n📈 UNIFIED DATASET SUMMARY")
print("-" * 30)
print(f"🔢 Final shape: {unified_data.shape}")
print(f"📅 Date range: {unified_data.index.min()} to {unified_data.index.max()}")
print(f"⏱️  Total days: {len(unified_data)} days")

# Missing data analysis
missing_analysis = unified_data.isnull().sum().sort_values(ascending=False)
high_missing = missing_analysis[missing_analysis > len(unified_data) * 0.5]

print(f"\n🔍 DATA QUALITY ANALYSIS:")
print(f"   • Complete columns: {len(missing_analysis[missing_analysis == 0])}")
print(f"   • Partial missing: {len(missing_analysis[(missing_analysis > 0) & (missing_analysis <= len(unified_data) * 0.5)])}")
print(f"   • High missing (>50%): {len(high_missing)}")

if len(high_missing) > 0:
    print(f"   • High missing columns: {list(high_missing.index[:5])}{'...' if len(high_missing) > 5 else ''}")

# Display sample of unified dataset
print(f"\n📊 UNIFIED DATASET SAMPLE:")
print(unified_data.head())

print(f"\n✅ Data integration completed - ready for feature engineering!")

## 6. Target Variable Construction & Feature Container Preparation

Creating the target variable (realized volatility) and preparing the feature matrix for advanced time series feature extraction.

In [None]:
# =============================================================================
# TARGET VARIABLE CONSTRUCTION & FEATURE PREPARATION
# =============================================================================

print("🎯 CONSTRUCTING TARGET VARIABLE & FEATURE CONTAINER")
print("=" * 55)

# Focus on recent data with sufficient history for feature extraction
recent_lookback = min(365, len(unified_data))  # Use up to 365 days or available data
X = unified_data.iloc[-recent_lookback:].copy()

print(f"📊 Working dataset: {X.shape} ({recent_lookback} days)")

# Data cleaning and preprocessing
print(f"\n🧹 DATA PREPROCESSING:")

# 1. Remove columns with excessive missing data (>90% missing)
missing_threshold = 0.9
cols_before = len(X.columns)
sufficient_data_cols = X.isnull().sum() / len(X) < missing_threshold
X = X.loc[:, sufficient_data_cols]
cols_removed = cols_before - len(X.columns)
print(f"   • Removed {cols_removed} columns with >{missing_threshold*100}% missing data")

# 2. Forward fill missing values (max 3 periods)
X = X.fillna(method='ffill', limit=3)
print(f"   • Applied forward fill (max 3 periods)")

# 3. Target variable construction - Ethereum realized volatility
target_price_col = f'prices_{TARGET_COIN}'
if target_price_col in X.columns:
    print(f"\n🎯 TARGET VARIABLE CONSTRUCTION:")
    
    # Calculate log returns
    X[f'log_returns_{TARGET_COIN}'] = np.log(X[target_price_col]) - np.log(X[target_price_col].shift(1))
    
    # Realized volatility (absolute log returns)
    X[f'realized_vol_{TARGET_COIN}'] = abs(X[f'log_returns_{TARGET_COIN}'])
    
    print(f"   • Log returns calculated for {TARGET_COIN}")
    print(f"   • Realized volatility computed")
    
    # Log returns statistics
    log_rets = X[f'log_returns_{TARGET_COIN}'].dropna()
    print(f"   • Mean log return: {log_rets.mean():.6f}")
    print(f"   • Volatility (std): {log_rets.std():.6f}")
    print(f"   • Skewness: {log_rets.skew():.3f}")
    print(f"   • Kurtosis: {log_rets.kurtosis():.3f}")
    
else:
    raise ValueError(f"Target price column '{target_price_col}' not found!")

# 4. Differencing for stationarity
print(f"\n📈 STATIONARITY TRANSFORMATION:")
X_stationary = X.diff().dropna()
print(f"   • Applied first differencing")
print(f"   • Shape after differencing: {X_stationary.shape}")

# 5. Target variable alignment
y = X_stationary[f'realized_vol_{TARGET_COIN}'].shift(-1).dropna().rename("target")
print(f"   • Target variable (next-period volatility): {len(y)} observations")

# 6. Feature-target alignment
common_idx = X_stationary.index.intersection(y.index)
X_final = X_stationary.loc[common_idx]
y_final = y.loc[common_idx]

print(f"\n✅ FINAL FEATURE CONTAINER:")
print(f"   • Features shape: {X_final.shape}")
print(f"   • Target shape: {len(y_final)}")
print(f"   • Date range: {X_final.index.min()} to {X_final.index.max()}")

# Target variable statistics
print(f"\n📊 TARGET VARIABLE STATISTICS:")
print(f"   • Mean: {y_final.mean():.6f}")
print(f"   • Std: {y_final.std():.6f}")
print(f"   • Min: {y_final.min():.6f}")
print(f"   • Max: {y_final.max():.6f}")
print(f"   • 95th percentile: {y_final.quantile(0.95):.6f}")

# Visualization of target variable
plt.figure(figsize=(15, 10))

# Time series plot
plt.subplot(2, 2, 1)
y_final.plot(alpha=0.7, color='darkblue')
plt.title(f'{TARGET_COIN.title()} Realized Volatility (Target Variable)', fontsize=12, fontweight='bold')
plt.ylabel('Realized Volatility')
plt.grid(True, alpha=0.3)

# Distribution plot
plt.subplot(2, 2, 2)
y_final.hist(bins=50, alpha=0.7, color='darkgreen', edgecolor='black')
plt.title('Target Variable Distribution', fontsize=12, fontweight='bold')
plt.xlabel('Realized Volatility')
plt.ylabel('Frequency')
plt.grid(True, alpha=0.3)

# Log returns plot
plt.subplot(2, 2, 3)
X_final[f'log_returns_{TARGET_COIN}'].plot(alpha=0.7, color='darkred')
plt.title(f'{TARGET_COIN.title()} Log Returns', fontsize=12, fontweight='bold')
plt.ylabel('Log Returns')
plt.grid(True, alpha=0.3)

# Autocorrelation of target
plt.subplot(2, 2, 4)
from pandas.plotting import autocorrelation_plot
autocorrelation_plot(y_final.dropna())
plt.title('Target Variable Autocorrelation', fontsize=12, fontweight='bold')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\n🚀 Ready for TSFresh feature engineering!")

## 7. Advanced Feature Engineering with TSFresh

This section implements the core of our advanced feature engineering pipeline:

1. **Time Series Rolling**: Converting wide-format data to rolled time series format
2. **TSFresh Feature Extraction**: Generating 600+ statistical time series features  
3. **Distributed Computing**: Using Dask for parallel feature extraction
4. **Feature Selection**: Statistical significance testing to select relevant features

This approach mirrors the methodology from  development-workspace\LatestNotebook.ipynb.

In [None]:
# =============================================================================
# ADVANCED FEATURE ENGINEERING WITH TSFRESH & DASK
# =============================================================================

print("🧠 ADVANCED FEATURE ENGINEERING PIPELINE")
print("=" * 50)

# Define Dask processing functions (mirroring LatestNotebook approach)
def roll_dask(df):
    """Roll time series for TSFresh feature extraction"""
    if len(df) == 0:
        return pd.DataFrame()
    
    print(f"🔄 Processing partition with {len(df)} rows, columns: {df.columns.tolist()[:5]}...")
    df = df.copy()
    df['date'] = pd.to_datetime(df['date'])
    
    # Roll time series with specified window
    rolled = roll_time_series(
        df,
        column_id='variable',
        column_sort='date',
        max_timeshift=TIME_WINDOW,
        min_timeshift=1,
        rolling_direction=1,
        n_jobs=1  # Single job per partition for Dask
    )
    return rolled

def extract_dask(df):
    """Extract TSFresh features from rolled time series"""
    df = df.copy().dropna()
    if len(df) == 0:
        return pd.DataFrame()
    
    print(f"🔍 Extracting features for partition with {len(df)} rows...")
    
    # Use efficient feature parameters to balance speed vs. feature richness
    features = extract_features(
        df,
        column_id='id',
        column_sort='date', 
        column_kind='variable',
        column_value='value',
        default_fc_parameters=EXTRACTION_SETTINGS,
        n_jobs=1,  # Single job per partition
        disable_progressbar=True
    )
    return features

def select_dask(df, y):
    """Select statistically significant features"""
    df = df.reset_index(level=0, drop=True).join(y, how='inner').dropna()
    if len(df) == 0:
        return pd.DataFrame()
    
    print(f"🎯 Selecting features for partition with {len(df)} rows...")
    
    # Feature selection with FDR control
    features = select_features(
        df.drop('target', axis=1),
        df['target'],
        ml_task='regression',
        fdr_level=DEFAULT_FDR_LEVEL,
        hypotheses_independent=False,
        n_jobs=1
    )
    return features

# STEP 1: Convert to long format for TSFresh processing
print(f"\n📋 STEP 1: DATA FORMAT CONVERSION")
print("-" * 35)

# Convert wide DataFrame to long format (variable-value pairs)
FC = X_final.reset_index().melt(id_vars=['date']).sort_values(by='variable')
print(f"✅ Converted to long format: {FC.shape}")

# Create Dask DataFrame with one partition per variable for parallel processing
n_partitions = FC['variable'].nunique()
FC_dask = dd.from_pandas(FC, npartitions=n_partitions)

# Verify partitioning (each partition should have one unique variable)
unique_vars_per_partition = FC_dask.map_partitions(lambda df: df['variable'].nunique()).compute()
print(f"📊 Created {n_partitions} partitions (variables per partition: {unique_vars_per_partition.unique()})")

# Display sample of long format data
print(f"\n📋 LONG FORMAT SAMPLE:")
print(FC.head(10))

# STEP 2: Time series rolling with Dask
print(f"\n🔄 STEP 2: TIME SERIES ROLLING")
print("-" * 35)

# Test rolling on one partition to get metadata
print("🧪 Testing rolling on sample partition...")
df_test = FC_dask.partitions[0].compute()
df_test['date'] = pd.to_datetime(df_test['date'])

rolled_test = roll_time_series(
    df_test,
    column_id='variable',
    column_sort='date',
    max_timeshift=TIME_WINDOW,
    min_timeshift=1,
    rolling_direction=1
)

print(f"✅ Rolling test successful: {rolled_test.shape}")

# Apply rolling to all partitions with Dask
print("🚀 Applying rolling transformation to all partitions...")
rolled_dask = FC_dask.map_partitions(roll_dask, meta=rolled_test).persist()

print(f"✅ Time series rolling completed and persisted in memory")

# STEP 3: Feature extraction with TSFresh
print(f"\n🔍 STEP 3: TSFRESH FEATURE EXTRACTION")
print("-" * 40)

print(f"⚙️  Using {EXTRACTION_SETTINGS.__class__.__name__} feature parameters")
print("🚀 Starting distributed feature extraction...")

# Extract features using Dask (this is the computationally intensive step)
features_dask = rolled_dask.map_partitions(extract_dask, enforce_metadata=False).persist()

print(f"✅ Feature extraction completed and persisted")

print(f"\n⏱️  Computing feature extraction results...")
progress_bar = progress.ProgressBar()
progress_bar.register()

# Compute results from all partitions
extracted_futures = client.compute(features_dask.to_delayed())
extracted_results = []

for i, future in enumerate(extracted_futures):
    try:
        result = future.result()
        if len(result) > 0:
            extracted_results.append(result)
            print(f"✅ Partition {i+1}: {result.shape} features extracted")
        else:
            print(f"⚠️  Partition {i+1}: No features extracted")
    except Exception as e:
        print(f"❌ Partition {i+1} failed: {e}")

# Combine all extracted features
if extracted_results:
    tsfresh_features = pd.concat(extracted_results, axis=0, sort=False)
    print(f"\n🎊 FEATURE EXTRACTION COMPLETED!")
    print(f"   • Total extracted features: {tsfresh_features.shape}")
    print(f"   • Feature types: {len(tsfresh_features.columns)} unique features")
    
    # Display sample features
    print(f"\n📊 SAMPLE EXTRACTED FEATURES:")
    print(tsfresh_features.head())
    
else:
    print("❌ No features were successfully extracted!")
    tsfresh_features = pd.DataFrame()

print(f"\n🎯 Ready for feature selection phase!")

## 8. Feature Selection & Final Dataset Construction

Statistical feature selection using False Discovery Rate (FDR) control to identify the most predictive features while controlling for multiple testing.

In [None]:
# =============================================================================
# FEATURE SELECTION & FINAL DATASET CONSTRUCTION  
# =============================================================================

print("🎯 FEATURE SELECTION & FINAL DATASET CONSTRUCTION")
print("=" * 55)

# STEP 4: Statistical Feature Selection
print(f"\n🔬 STEP 4: STATISTICAL FEATURE SELECTION")
print("-" * 45)

if len(tsfresh_features) > 0:
    print(f"📊 Starting feature selection from {tsfresh_features.shape[1]} TSFresh features...")
    
    # Apply statistical feature selection with Dask
    selected_dask = tsfresh_features.apply(lambda df: select_dask(df, y_final), axis=1)
    
    print("🚀 Computing feature selection results...")
    selected_futures = client.compute(selected_dask.to_delayed()) if hasattr(selected_dask, 'to_delayed') else []
    
    # Alternative approach: Direct feature selection on TSFresh features
    print("🔬 Applying direct feature selection...")
    
    # Align TSFresh features with target
    aligned_idx = tsfresh_features.index.intersection(y_final.index)
    tsfresh_aligned = tsfresh_features.loc[aligned_idx]
    y_aligned = y_final.loc[aligned_idx]
    
    if len(tsfresh_aligned) > 0 and len(y_aligned) > 0:
        print(f"✅ Aligned data: {tsfresh_aligned.shape} features, {len(y_aligned)} targets")
        
        # Select significant TSFresh features
        selected_tsfresh = select_features(
            tsfresh_aligned,
            y_aligned,
            ml_task='regression', 
            fdr_level=DEFAULT_FDR_LEVEL,
            hypotheses_independent=False,
            n_jobs=-1  # Use all available cores
        )
        
        print(f"🎊 TSFresh feature selection completed!")
        print(f"   • Selected features: {selected_tsfresh.shape}")
        print(f"   • Selection rate: {selected_tsfresh.shape[1]/tsfresh_aligned.shape[1]*100:.1f}%")
        
    else:
        print("⚠️  No aligned TSFresh features available")
        selected_tsfresh = pd.DataFrame()
        
else:
    print("⚠️  No TSFresh features available for selection")
    selected_tsfresh = pd.DataFrame()

# STEP 5: Base Feature Selection (Original Variables)
print(f"\n📊 STEP 5: BASE FEATURE SELECTION")
print("-" * 35)

print("🔬 Selecting significant base features...")

# Select significant features from original variables
base_selected = select_features(
    X_final, 
    y_final, 
    fdr_level=DEFAULT_FDR_LEVEL, 
    ml_task='regression',
    hypotheses_independent=False,
    n_jobs=-1
)

print(f"✅ Base feature selection completed!")
print(f"   • Selected features: {base_selected.shape}")
print(f"   • Selection rate: {base_selected.shape[1]/X_final.shape[1]*100:.1f}%")

# STEP 6: Final Feature Set Construction
print(f"\n🏗️  STEP 6: FINAL FEATURE SET CONSTRUCTION")
print("-" * 45)

# Combine TSFresh and base features
if len(selected_tsfresh) > 0:
    print("🔗 Combining TSFresh and base features...")
    final_features = selected_tsfresh.join(base_selected, how='outer')
    feature_sources = {
        'tsfresh': selected_tsfresh.shape[1],
        'base': base_selected.shape[1],
        'total': final_features.shape[1]
    }
else:
    print("📊 Using base features only...")
    final_features = base_selected
    feature_sources = {
        'tsfresh': 0,
        'base': base_selected.shape[1], 
        'total': final_features.shape[1]
    }

# Final alignment with target
common_idx = final_features.index.intersection(y_final.index)
final_features = final_features.loc[common_idx]
y_final_aligned = y_final.loc[common_idx]

print(f"🎊 FINAL DATASET CONSTRUCTED!")
print(f"   • TSFresh features: {feature_sources['tsfresh']}")
print(f"   • Base features: {feature_sources['base']}")
print(f"   • Total features: {feature_sources['total']}")
print(f"   • Samples: {len(final_features)}")
print(f"   • Date range: {final_features.index.min()} to {final_features.index.max()}")

# Feature importance preview (correlation with target)
if len(final_features) > 0:
    feature_correlations = final_features.corrwith(y_final_aligned).abs().sort_values(ascending=False)
    top_features = feature_correlations.head(10)
    
    print(f"\n🔝 TOP 10 FEATURES BY CORRELATION:")
    for i, (feature, corr) in enumerate(top_features.items(), 1):
        print(f"   {i:2d}. {feature[:50]:<50} | {corr:.4f}")

# Create final dataset for ML pipeline
final_dataset = final_features.join(y_final_aligned.rename('target'), how='inner')

print(f"\n📈 FEATURE STATISTICS:")
print(f"   • Mean features per sample: {final_features.notna().sum(axis=1).mean():.1f}")
print(f"   • Feature completeness: {(1 - final_features.isnull().sum().sum() / (final_features.shape[0] * final_features.shape[1]))*100:.1f}%")
print(f"   • Target correlation range: [{feature_correlations.min():.4f}, {feature_correlations.max():.4f}]")

# Display final dataset sample
print(f"\n📊 FINAL DATASET SAMPLE:")
print(final_dataset.head())

print(f"\n🚀 Ready for XGBoost + Optuna ML pipeline!")

## 9. XGBoost + Optuna ML Pipeline

Final machine learning pipeline with:

1. **Dask DMatrix Construction**: Distributed data matrices for scalable training
2. **Optuna Hyperparameter Optimization**: Bayesian optimization for best parameters  
3. **XGBoost Training**: Gradient boosting with early stopping
4. **Model Evaluation**: Comprehensive metrics and visualization

This mirrors the sophisticated approach from LatestNotebook.ipynb with professional presentation.

In [None]:
# =============================================================================
# XGBOOST + OPTUNA MACHINE LEARNING PIPELINE
# =============================================================================

print("🤖 XGBOOST + OPTUNA ML PIPELINE")
print("=" * 40)

# Optuna objective function for hyperparameter optimization
def optuna_objective(trial):
    """Optuna objective function for XGBoost hyperparameter optimization"""
    
    params = {
        "verbosity": 0,
        "objective": "reg:squarederror",
        "eval_metric": DEFAULT_XGB_METRIC,
        "tree_method": DEFAULT_TREE_METHOD,
        "random_state": RANDOM_SEED,
        
        # Hyperparameters to optimize
        "max_depth": trial.suggest_int("max_depth", 3, 12),
        "learning_rate": trial.suggest_float("learning_rate", 0.005, 0.3, log=True),
        "subsample": trial.suggest_float("subsample", 0.6, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "gamma": trial.suggest_float("gamma", 0.0, 1.0),
        "min_child_weight": trial.suggest_float("min_child_weight", 0.1, 10, log=True),
        "lambda": trial.suggest_float("lambda", 0.01, 10.0, log=True),
        "alpha": trial.suggest_float("alpha", 0.01, 10.0, log=True),
        "n_estimators": trial.suggest_int("n_estimators", 100, 1000)
    }
    
    # Train model with current parameters
    model = dxgb.train(
        client,
        params,
        dtrain,
        num_boost_round=params["n_estimators"],
        early_stopping_rounds=DEFAULT_EARLY_STOPPING,
        evals=[(dtrain, "train")],
        verbose_eval=False
    )
    
    return model["history"]["train"][DEFAULT_XGB_METRIC][-1]

# STEP 1: Prepare Dask DMatrix
print(f"\n🏗️  STEP 1: DASK DMATRIX PREPARATION")
print("-" * 40)

if len(final_dataset) > 50:  # Ensure sufficient data
    
    # Split into features and target
    X_ml = final_dataset.drop('target', axis=1)
    y_ml = final_dataset['target']
    
    print(f"📊 ML Dataset prepared:")
    print(f"   • Features: {X_ml.shape}")
    print(f"   • Target: {len(y_ml)} samples")
    
    # Create Dask DataFrames for distributed processing
    X_dask = dd.from_pandas(X_ml, npartitions=SPLITS)
    y_dask = dd.from_pandas(y_ml, npartitions=SPLITS)
    
    # Time series split for validation
    n_samples = len(X_ml)
    n_train = int(n_samples * 0.8)  # 80% training, 20% test
    
    X_train_dask = X_dask.iloc[:n_train]
    X_test_dask = X_dask.iloc[n_train:]
    y_train_dask = y_dask.iloc[:n_train]
    y_test_dask = y_dask.iloc[n_train:]
    
    # Persist in memory for faster access
    X_train_dask, X_test_dask, y_train_dask, y_test_dask = client.persist([
        X_train_dask, X_test_dask, y_train_dask, y_test_dask
    ])
    
    # Create DMatrix for XGBoost
    dtrain = dxgb.DaskDMatrix(client, X_train_dask, y_train_dask)
    
    print(f"✅ Dask DMatrix created successfully!")
    print(f"   • Training samples: {n_train}")
    print(f"   • Test samples: {n_samples - n_train}")
    
    # STEP 2: Optuna Hyperparameter Optimization
    print(f"\n🧪 STEP 2: OPTUNA HYPERPARAMETER OPTIMIZATION")
    print("-" * 50)
    
    print(f"🚀 Starting Bayesian optimization with {DEFAULT_N_TRIALS} trials...")
    
    # Create Optuna study with Dask storage
    storage = DaskStorage()
    study = optuna.create_study(
        direction="minimize",
        storage=storage,
        study_name=f"crypto_volatility_{TARGET_COIN}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    )
    
    # Run optimization
    study.optimize(
        optuna_objective,
        n_trials=DEFAULT_N_TRIALS,
        n_jobs=4,  # Parallel trials
        gc_after_trial=True,
        show_progress_bar=True
    )
    
    print(f"🎊 HYPERPARAMETER OPTIMIZATION COMPLETED!")
    print(f"   • Best {DEFAULT_XGB_METRIC.upper()}: {study.best_value:.6f}")
    print(f"   • Total trials: {len(study.trials)}")
    
    print(f"\n🏆 BEST HYPERPARAMETERS:")
    for param, value in study.best_params.items():
        print(f"   • {param}: {value}")
    
    # STEP 3: Final Model Training
    print(f"\n🌳 STEP 3: FINAL MODEL TRAINING")
    print("-" * 35)
    
    print("🚀 Training final model with optimized hyperparameters...")
    
    final_model = dxgb.train(
        client,
        study.best_params,
        dtrain,
        num_boost_round=study.best_params.get("n_estimators", DEFAULT_N_ROUNDS),
        early_stopping_rounds=DEFAULT_EARLY_STOPPING,
        evals=[(dtrain, "train")],
        verbose_eval=False
    )
    
    print(f"✅ Final model training completed!")
    
    # STEP 4: Model Predictions
    print(f"\n🔮 STEP 4: MODEL PREDICTIONS")
    print("-" * 30)
    
    # Get feature names from model
    model_features = final_model['booster'].feature_names
    print(f"📊 Model uses {len(model_features)} features")
    
    # Create test DMatrix
    dtest = dxgb.DaskDMatrix(client, X_test_dask[model_features])
    
    # Make predictions
    predictions = dxgb.predict(client, final_model, dtest)
    
    print(f"✅ Predictions completed!")
    
    # STEP 5: Model Evaluation
    print(f"\n📈 STEP 5: MODEL EVALUATION")
    print("-" * 30)
    
    # Convert to pandas for evaluation
    y_test_pd = y_test_dask.compute()
    predictions_pd = pd.Series(predictions.compute(), index=y_test_pd.index)
    
    # Calculate comprehensive metrics
    r2 = r2_score(y_test_pd, predictions_pd)
    mae = mean_absolute_error(y_test_pd, predictions_pd)
    rmse = np.sqrt(mean_squared_error(y_test_pd, predictions_pd))
    
    # Advanced metrics
    std_target = y_test_pd.std()
    mae_std_ratio = mae / std_target
    
    # MASE (Mean Absolute Scaled Error)
    naive_forecast = y_test_pd.shift(1)  
    mae_naive = mean_absolute_error(y_test_pd[1:], naive_forecast[1:])
    mase = mae / mae_naive if mae_naive != 0 else np.nan
    
    print(f"🎊 MODEL PERFORMANCE METRICS:")
    print(f"   • R² Score: {r2:.6f}")
    print(f"   • MAE: {mae:.6f}")
    print(f"   • RMSE: {rmse:.6f}")  
    print(f"   • MAE/StdDev: {mae_std_ratio:.6f}")
    print(f"   • MASE: {mase:.6f}")
    print(f"   • Target Std: {std_target:.6f}")
    
    model_performance = {
        'r2_score': r2,
        'mae': mae,
        'rmse': rmse, 
        'mae_std_ratio': mae_std_ratio,
        'mase': mase,
        'target_std': std_target,
        'best_params': study.best_params,
        'best_optuna_score': study.best_value
    }
    
    print(f"\n🎉 ADVANCED ML PIPELINE COMPLETED SUCCESSFULLY!")
    
else:
    print(f"❌ Insufficient data for ML pipeline: {len(final_dataset)} samples (minimum: 50)")
    model_performance = None
    predictions_pd = None
    y_test_pd = None

## 10. Advanced Visualization & Results Analysis

Professional visualization and comprehensive analysis of model performance, feature importance, and prediction quality.

In [None]:
# =============================================================================
# ADVANCED VISUALIZATION & RESULTS ANALYSIS
# =============================================================================

print("📊 ADVANCED VISUALIZATION & RESULTS ANALYSIS")
print("=" * 50)

if model_performance is not None and predictions_pd is not None:
    
    # Create comprehensive visualization dashboard
    fig = plt.figure(figsize=(20, 16))
    
    # 1. Time Series Prediction Plot
    ax1 = plt.subplot(3, 3, 1)
    viz_data = pd.DataFrame({
        'Actual': y_test_pd,
        'Predicted': predictions_pd
    })
    
    viz_data.plot(ax=ax1, alpha=0.8, linewidth=2)
    ax1.set_title(f'{TARGET_COIN.title()} Realized Volatility Forecasting\nTime Series Predictions', 
                  fontsize=12, fontweight='bold')
    ax1.set_ylabel('Realized Volatility')
    ax1.grid(True, alpha=0.3)
    ax1.legend()
    
    # 2. Prediction Scatter Plot
    ax2 = plt.subplot(3, 3, 2)
    ax2.scatter(y_test_pd, predictions_pd, alpha=0.6, s=30, color='darkblue')
    
    # Perfect prediction line
    min_val = min(y_test_pd.min(), predictions_pd.min())
    max_val = max(y_test_pd.max(), predictions_pd.max())
    ax2.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='Perfect Prediction')
    
    ax2.set_xlabel('Actual Volatility')
    ax2.set_ylabel('Predicted Volatility')
    ax2.set_title(f'Prediction Accuracy\nR² = {model_performance["r2_score"]:.4f}', 
                  fontsize=12, fontweight='bold')
    ax2.grid(True, alpha=0.3)
    ax2.legend()
    
    # 3. Residuals Plot
    ax3 = plt.subplot(3, 3, 3)
    residuals = y_test_pd - predictions_pd
    ax3.scatter(predictions_pd, residuals, alpha=0.6, s=30, color='darkgreen')
    ax3.axhline(y=0, color='red', linestyle='--', linewidth=2)
    ax3.set_xlabel('Predicted Volatility')
    ax3.set_ylabel('Residuals')
    ax3.set_title(f'Residuals Analysis\nMAE = {model_performance["mae"]:.6f}', 
                  fontsize=12, fontweight='bold')
    ax3.grid(True, alpha=0.3)
    
    # 4. Prediction Distribution
    ax4 = plt.subplot(3, 3, 4)
    y_test_pd.hist(bins=30, alpha=0.7, label='Actual', color='blue', density=True)
    predictions_pd.hist(bins=30, alpha=0.7, label='Predicted', color='orange', density=True)
    ax4.set_xlabel('Volatility')
    ax4.set_ylabel('Density')
    ax4.set_title('Distribution Comparison', fontsize=12, fontweight='bold')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    # 5. Error Distribution
    ax5 = plt.subplot(3, 3, 5)
    residuals.hist(bins=30, alpha=0.7, color='darkred', edgecolor='black')
    ax5.set_xlabel('Prediction Error')
    ax5.set_ylabel('Frequency')
    ax5.set_title(f'Error Distribution\nRMSE = {model_performance["rmse"]:.6f}', 
                  fontsize=12, fontweight='bold')
    ax5.grid(True, alpha=0.3)
    
    # 6. Feature Importance (if available)
    ax6 = plt.subplot(3, 3, 6)
    try:
        if hasattr(final_model['booster'], 'get_score'):
            importance = final_model['booster'].get_score(importance_type='gain')
            if importance:
                top_features = dict(sorted(importance.items(), key=lambda x: x[1], reverse=True)[:15])
                feature_names = [name[:20] + '...' if len(name) > 20 else name for name in top_features.keys()]
                values = list(top_features.values())
                
                y_pos = np.arange(len(feature_names))
                ax6.barh(y_pos, values, alpha=0.8, color='skyblue', edgecolor='black')
                ax6.set_yticks(y_pos)
                ax6.set_yticklabels(feature_names, fontsize=8)
                ax6.set_xlabel('Feature Importance (Gain)')
                ax6.set_title('Top 15 Feature Importance', fontsize=12, fontweight='bold')
                ax6.grid(True, alpha=0.3, axis='x')
            else:
                ax6.text(0.5, 0.5, 'Feature importance\nnot available', 
                        ha='center', va='center', transform=ax6.transAxes, fontsize=12)
                ax6.set_title('Feature Importance', fontsize=12, fontweight='bold')
        else:
            ax6.text(0.5, 0.5, 'Feature importance\nnot available', 
                    ha='center', va='center', transform=ax6.transAxes, fontsize=12)
            ax6.set_title('Feature Importance', fontsize=12, fontweight='bold')
    except Exception as e:
        ax6.text(0.5, 0.5, f'Feature importance\nerror: {str(e)[:30]}...', 
                ha='center', va='center', transform=ax6.transAxes, fontsize=10)
        ax6.set_title('Feature Importance', fontsize=12, fontweight='bold')
    
    # 7. Optuna Optimization History
    ax7 = plt.subplot(3, 3, 7)
    if 'study' in locals() and len(study.trials) > 1:
        trial_values = [trial.value for trial in study.trials if trial.value is not None]
        if trial_values:
            ax7.plot(trial_values, alpha=0.7, linewidth=2, color='purple')
            ax7.axhline(y=study.best_value, color='red', linestyle='--', linewidth=2, 
                       label=f'Best: {study.best_value:.6f}')
            ax7.set_xlabel('Trial Number')
            ax7.set_ylabel(f'{DEFAULT_XGB_METRIC.upper()}')
            ax7.set_title('Optuna Optimization History', fontsize=12, fontweight='bold')
            ax7.legend()
            ax7.grid(True, alpha=0.3)
        else:
            ax7.text(0.5, 0.5, 'No optimization\ndata available', 
                    ha='center', va='center', transform=ax7.transAxes, fontsize=12)
            ax7.set_title('Optimization History', fontsize=12, fontweight='bold')
    else:
        ax7.text(0.5, 0.5, 'No optimization\ndata available', 
                ha='center', va='center', transform=ax7.transAxes, fontsize=12)
        ax7.set_title('Optimization History', fontsize=12, fontweight='bold')
    
    # 8. Model Performance Summary
    ax8 = plt.subplot(3, 3, 8)
    ax8.axis('off')
    
    performance_text = f"""
    📊 MODEL PERFORMANCE SUMMARY
    
    🎯 Target: {TARGET_COIN.title()} Realized Volatility
    📅 Test Period: {len(y_test_pd)} days
    
    📈 REGRESSION METRICS:
    • R² Score: {model_performance['r2_score']:.4f}
    • MAE: {model_performance['mae']:.6f}
    • RMSE: {model_performance['rmse']:.6f}
    • MASE: {model_performance['mase']:.4f}
    • MAE/StdDev: {model_performance['mae_std_ratio']:.4f}
    
    🧪 OPTIMIZATION:
    • Best Optuna Score: {model_performance['best_optuna_score']:.6f}
    • Trials: {len(study.trials) if 'study' in locals() else 'N/A'}
    
    🏗️ DATASET:
    • Features: {len(model_features) if 'model_features' in locals() else 'N/A'}
    • Training Samples: {n_train if 'n_train' in locals() else 'N/A'}
    • Test Samples: {len(y_test_pd)}
    """
    
    ax8.text(0.05, 0.95, performance_text, transform=ax8.transAxes, fontsize=10,
             verticalalignment='top', fontfamily='monospace',
             bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgray', alpha=0.8))
    
    # 9. Prediction Confidence Intervals (simplified)
    ax9 = plt.subplot(3, 3, 9)
    
    # Calculate rolling prediction accuracy
    window_size = min(20, len(y_test_pd) // 5)
    if window_size > 2:
        rolling_mae = pd.Series(np.abs(residuals)).rolling(window=window_size).mean()
        rolling_mae.plot(ax=ax9, alpha=0.8, linewidth=2, color='darkred')
        ax9.fill_between(rolling_mae.index, 0, rolling_mae.values, alpha=0.3, color='red')
        ax9.set_xlabel('Time')
        ax9.set_ylabel('Rolling MAE')
        ax9.set_title(f'Prediction Accuracy Over Time\n(Window: {window_size} days)', 
                      fontsize=12, fontweight='bold')
        ax9.grid(True, alpha=0.3)
    else:
        ax9.text(0.5, 0.5, 'Insufficient data\nfor rolling analysis', 
                ha='center', va='center', transform=ax9.transAxes, fontsize=12)
        ax9.set_title('Rolling Accuracy', fontsize=12, fontweight='bold')
    
    plt.tight_layout()
    plt.show()
    
    # Summary Statistics Table
    print(f"\n📊 COMPREHENSIVE RESULTS SUMMARY")
    print("=" * 60)
    
    summary_data = {
        'Metric': ['R² Score', 'MAE', 'RMSE', 'MASE', 'MAE/StdDev', 'Best Optuna Score'],
        'Value': [
            f"{model_performance['r2_score']:.6f}",
            f"{model_performance['mae']:.6f}",
            f"{model_performance['rmse']:.6f}",
            f"{model_performance['mase']:.6f}",
            f"{model_performance['mae_std_ratio']:.6f}",
            f"{model_performance['best_optuna_score']:.6f}"
        ],
        'Interpretation': [
            'Explained variance (higher = better)',
            'Average absolute error (lower = better)',
            'Root mean squared error (lower = better)',
            'Mean absolute scaled error (lower = better)',
            'Error relative to volatility (lower = better)',
            'Optimization objective value'
        ]
    }
    
    summary_df = pd.DataFrame(summary_data)
    print(summary_df.to_string(index=False))
    
    print(f"\n🎊 ADVANCED CRYPTOCURRENCY VOLATILITY FORECASTING COMPLETED!")
    print(f"🎯 Model successfully predicts {TARGET_COIN.title()} realized volatility")
    print(f"📈 Achieved R² = {model_performance['r2_score']:.4f} with MAE = {model_performance['mae']:.6f}")
    
else:
    print("⚠️  No model performance data available for visualization")
    print("Please ensure the ML pipeline completed successfully")

# Clean up resources
print(f"\n🧹 CLEANING UP RESOURCES...")
try:
    if 'client' in globals() and client:
        print("Closing Dask client...")
        client.close()
    if 'cluster' in globals() and cluster:
        print("Closing Dask cluster...")
        cluster.close()
    print("✅ Cleanup completed!")
except Exception as e:
    print(f"⚠️  Cleanup warning: {e}")

print(f"\n🎉 ADVANCED PIPELINE EXECUTION COMPLETED!")

In [None]:
# Quick ML Pipeline Test (without hyperparameter optimization)
print("=== QUICK ML PIPELINE TEST ===")

# Setup the ML pipeline with basic parameters
try:
    # Import the modules fresh
    import importlib
    import sys
    
    # Reload the pipeline module to get fresh imports
    if 'models.pipeline' in sys.modules:
        importlib.reload(sys.modules['models.pipeline'])
    
    from models.pipeline import CryptoVolatilityMLPipeline
    
    # Create pipeline with conservative settings
    ml_pipeline = CryptoVolatilityMLPipeline(
        n_trials=5,  # Very small for quick test
        n_rounds=50,  # Reduced rounds
        eval_metric='mae',
        random_seed=42
    )
    
    print("✅ ML Pipeline created successfully")
    print(f"Training on {len(feature_set)} samples with {len(feature_set.columns)} features")
    
    # Set basic parameters manually to avoid optimization
    basic_params = {
        'max_depth': 6,
        'learning_rate': 0.1,
        'n_estimators': 100,
        'subsample': 0.8,
        'colsample_bytree': 0.8,
        'random_state': 42
    }
    
    print("Running ML pipeline with basic parameters...")
    
    results = ml_pipeline.run_complete_pipeline(
        final_features=feature_set,
        client=client,
        target_coin="ethereum", 
        optimize=False  # Skip optimization, use default parameters
    )
    
    # Display basic results
    print("\n" + "="*50)
    print("📊 PIPELINE RESULTS")
    print("="*50)
    
    if 'model_performance' in results:
        perf = results['model_performance']
        print(f"✅ Model trained successfully!")
        print(f"📈 R² Score: {perf.get('r2_score', 'N/A'):.4f}")
        print(f"📉 MAE: {perf.get('mae', 'N/A'):.6f}")
        print(f"📉 RMSE: {perf.get('rmse', 'N/A'):.6f}")
    
    print(f"⚡ Feature importance available: {'feature_importance' in results}")
    print(f"📊 Predictions available: {'predictions' in results}")
    
    print("\n🎉 Basic ML pipeline completed successfully!")
    
except Exception as e:
    print(f"❌ Error in ML pipeline: {str(e)}")
    print("This might be due to cached imports. Try restarting the kernel if needed.")

In [None]:
# Demonstrate Different Frequency Settings
print("=== FREQUENCY COMPARISON DEMONSTRATION ===")

# Test different frequencies
frequencies_to_test = ["1D", "1H"]
frequency_results = {}

for test_freq in frequencies_to_test:
    try:
        print(f"\n--- Testing {test_freq} frequency ---")
        
        # Create a new collector with different frequency
        test_collector = CryptoDataCollector(
            timezone="Europe/Madrid",
            top_n=3,  # Reduced for faster testing
            lookback_days=7,  # Very short for demonstration
            frequency=test_freq
        )
        
        print(f"Collector settings:")
        print(f"  - Frequency: {test_collector.FREQUENCY}")
        print(f"  - Pandas freq: {test_collector.get_pandas_freq()}")
        print(f"  - Binance interval: {test_collector.get_binance_interval()}")
        print(f"  - Deribit resolution: {test_collector.get_deribit_resolution()}")
        print(f"  - Batch size: {test_collector.get_batch_size_for_frequency()}")
        
        # Test a quick data collection
        small_universe = ['bitcoin', 'ethereum']
        test_data = test_collector.coingecko_get_price_action(small_universe, sleep_time=1)
        
        frequency_results[test_freq] = {
            'shape': test_data.shape,
            'date_range': (test_data.index.min(), test_data.index.max()) if len(test_data) > 0 else None,
            'pandas_freq': test_collector.get_pandas_freq(),
            'batch_size': test_collector.get_batch_size_for_frequency()
        }
        
        print(f"  - Data shape: {test_data.shape}")
        if len(test_data) > 0:
            print(f"  - Date range: {test_data.index.min()} to {test_data.index.max()}")
            print(f"  - Sample frequency: {test_data.index.freq if hasattr(test_data.index, 'freq') else 'Inferred'}")
        
    except Exception as e:
        print(f"Error testing {test_freq}: {e}")
        frequency_results[test_freq] = {'error': str(e)}

# Summary comparison
print(f"\n=== FREQUENCY COMPARISON SUMMARY ===")
for freq, results in frequency_results.items():
    print(f"\n{freq} Frequency:")
    if 'error' in results:
        print(f"  - Error: {results['error']}")
    else:
        print(f"  - Data points: {results['shape'][0]} rows, {results['shape'][1]} columns")
        print(f"  - Pandas frequency: {results['pandas_freq']}")
        print(f"  - Dune batch size: {results['batch_size']}")
        if results['date_range']:
            print(f"  - Date range: {results['date_range'][0]} to {results['date_range'][1]}")

print(f"\n=== FREQUENCY IMPLEMENTATION COMPLETE ===")
print("All data collection methods now support frequency parameter!")
print("Available frequencies: 1D (daily), 1H (hourly)")
print("Frequency is automatically applied to:")
print("  - CoinGecko data resampling")
print("  - Binance API intervals")
print("  - Deribit DVOL resolution")
print("  - FRED data resampling")
print("  - Dune Analytics batch sizing")
print("  - Feature engineering alignment")

## Summary

This quick example demonstrates:

1. **Data Collection**: Basic crypto price data collection
2. **Feature Engineering**: Simple technical indicators
3. **ML Pipeline**: Quick XGBoost model with hyperparameter optimization

For a full-featured pipeline with TSFresh, multiple data sources, and extensive optimization, see `main_pipeline.ipynb`.

### Next Steps:
- Configure more API keys in `.env` for additional data sources
- Increase `lookback_days` and `n_trials` for better model performance
- Experiment with different `extraction_settings` for TSFresh
- Try different target coins and evaluation metrics