# Momentum Trading System - Validation and Example Usage

This notebook demonstrates how to use the momentum trading system and validates its implementation following López de Prado's methodology.

In [None]:
import sys
import os
sys.path.append('../src')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

# Import our modules
from data.data_handler import FinancialDataHandler
from features.fractional_diff import FractionalDifferentiator
from features.momentum_features import MomentumFeatureEngineer
from labeling.triple_barrier import TripleBarrierLabeler
from validation.purged_cv import PurgedKFold, TimeSeriesValidator
from models.ml_models import MomentumMLModel
from backtesting.backtester import MomentumBacktester

## 1. Data Loading and Preprocessing

In [None]:
# Initialize data handler
symbols = ['SPY', 'QQQ', 'IWM']
data_handler = FinancialDataHandler(symbols)

# Fetch data
start_date = '2020-01-01'
end_date = '2023-12-31'
data = data_handler.fetch_data(start_date, end_date)

# Focus on SPY for demonstration
spy_data = data['SPY']
print(f"Data shape: {spy_data.shape}")
print(f"Date range: {spy_data.index[0]} to {spy_data.index[-1]}")

# Validate data quality
quality_metrics = data_handler.validate_data_quality('SPY')
print("\nData Quality Metrics:")
for metric, value in quality_metrics.items():
    print(f"{metric}: {value:.4f}")

## 2. Fractional Differentiation Validation

In [None]:
# Test fractional differentiation
frac_diff = FractionalDifferentiator()
prices = spy_data['Close']

# Find optimal d parameter
optimal_d = frac_diff.find_optimal_d(prices, d_range=(0.0, 1.0), step=0.1)
print(f"Optimal fractional differentiation parameter: {optimal_d:.2f}")

# Apply fractional differentiation
frac_diff_series = frac_diff.frac_diff_ffd(prices, optimal_d)

# Validate stationarity
original_stationarity = frac_diff.validate_stationarity(prices)
frac_diff_stationarity = frac_diff.validate_stationarity(frac_diff_series)

print("\nStationarity Test Results:")
print(f"Original series ADF p-value: {original_stationarity['adf']['p_value']:.4f}")
print(f"Frac diff series ADF p-value: {frac_diff_stationarity['adf']['p_value']:.4f}")

# Memory preservation
memory_correlation = frac_diff.memory_preservation_test(prices, frac_diff_series)
print(f"Memory preservation correlation: {memory_correlation:.4f}")

## 3. Feature Engineering

In [None]:
# Create momentum features
feature_engineer = MomentumFeatureEngineer()
features = feature_engineer.create_momentum_features(
    prices=spy_data['Close'],
    volume=spy_data['Volume']
)

print(f"Feature matrix shape: {features.shape}")
print(f"Features created: {list(features.columns)}")

# Check for missing values
missing_pct = features.isnull().sum() / len(features) * 100
print(f"\nFeatures with >10% missing values:")
high_missing = missing_pct[missing_pct > 10]
print(high_missing if len(high_missing) > 0 else "None")

## 4. Triple Barrier Labeling

In [None]:
# Create labels using triple barrier method
labeler = TripleBarrierLabeler()

# Calculate volatility for barriers
returns = data_handler.get_returns('SPY')
volatility = data_handler.get_volatility('SPY', window=20)

# Create event timestamps (daily)
t_events = spy_data.index[20:]  # Skip initial period for volatility calculation

# Get triple barrier events
events = labeler.get_events(
    close=spy_data['Close'],
    t_events=t_events,
    pt_sl=[1.0, 1.0],  # Symmetric barriers
    target=volatility,
    min_ret=0.005  # 0.5% minimum return
)

# Get labels
labeled_events = labeler.get_bins(events, spy_data['Close'])

# Analyze label distribution
label_analysis = labeler.analyze_label_distribution(labeled_events)
print("Label Distribution Analysis:")
for key, value in label_analysis.items():
    print(f"{key}: {value}")

## 5. Model Training with Purged Cross-Validation

In [None]:
# Align features and labels
common_index = features.index.intersection(labeled_events.index)
X = features.loc[common_index].fillna(method='ffill').fillna(0)
y = labeled_events.loc[common_index]['bin']

print(f"Aligned dataset shape: X={X.shape}, y={y.shape}")

# Create sample weights
sample_weights = labeler.get_sample_weights(
    labeled_events.loc[common_index],
    spy_data['Close'],
    method='time_decay'
)

# Initialize model
model = MomentumMLModel(
    model_type='random_forest',
    use_feature_selection=True,
    n_features_select=15
)

# Purged cross-validation
validator = TimeSeriesValidator()
cv_results = validator.validate_model(
    model=model,
    X=X,
    y=y,
    cv_method='purged_kfold',
    n_splits=5,
    sample_weights=sample_weights,
    scoring=['accuracy', 'precision', 'recall', 'f1']
)

print("\nCross-Validation Results:")
for metric in ['accuracy', 'precision', 'recall', 'f1']:
    mean_score = cv_results[metric].mean()
    std_score = cv_results[metric].std()
    print(f"{metric}: {mean_score:.4f} (+/- {std_score*2:.4f})")

## 6. Model Training and Feature Importance

In [None]:
# Train final model
model.fit(X, y, sample_weights=sample_weights)

# Get feature importance
feature_importance = model.get_feature_importance()
print("Top 10 Most Important Features:")
print(feature_importance.head(10))

# Plot feature importance
plt.figure(figsize=(10, 6))
feature_importance.head(15).plot(kind='bar')
plt.title('Top 15 Feature Importance')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## 7. Backtesting

In [None]:
# Generate trading signals
signals = pd.Series(model.predict(X), index=X.index)
probabilities = pd.DataFrame(
    model.predict_proba(X), 
    index=X.index,
    columns=[-1, 0, 1]
).max(axis=1)  # Use max probability as confidence

# Initialize backtester
backtester = MomentumBacktester(
    initial_capital=100000,
    transaction_cost=0.001,  # 0.1%
    market_impact=0.0005,   # 0.05%
    max_position_size=0.2   # 20% max position
)

# Run backtest
backtest_results = backtester.backtest(
    signals=signals,
    prices=spy_data.loc[signals.index],
    probabilities=probabilities,
    position_sizing_method='kelly'
)

print("Backtest Results:")
for key, value in backtest_results.items():
    if isinstance(value, float):
        print(f"{key}: {value:.4f}")
    else:
        print(f"{key}: {value}")

## 8. Performance Visualization

In [None]:
# Plot backtest results
backtester.plot_results(spy_data.loc[signals.index])

# Calculate monthly returns
portfolio_df = backtester.get_portfolio_history()
portfolio_returns = portfolio_df['portfolio_value'].pct_change().dropna()

# Performance comparison
spy_returns = spy_data.loc[portfolio_returns.index]['Close'].pct_change().dropna()
common_index = portfolio_returns.index.intersection(spy_returns.index)

plt.figure(figsize=(12, 8))

# Cumulative returns comparison
plt.subplot(2, 2, 1)
strategy_cumret = (1 + portfolio_returns.loc[common_index]).cumprod()
benchmark_cumret = (1 + spy_returns.loc[common_index]).cumprod()

plt.plot(strategy_cumret.index, strategy_cumret, label='Strategy', linewidth=2)
plt.plot(benchmark_cumret.index, benchmark_cumret, label='Buy & Hold', linewidth=2)
plt.title('Cumulative Returns Comparison')
plt.legend()
plt.grid(True, alpha=0.3)

# Monthly returns distribution
plt.subplot(2, 2, 2)
monthly_strategy = portfolio_returns.resample('M').apply(lambda x: (1+x).prod() - 1)
monthly_benchmark = spy_returns.resample('M').apply(lambda x: (1+x).prod() - 1)

plt.hist(monthly_strategy, bins=20, alpha=0.7, label='Strategy', density=True)
plt.hist(monthly_benchmark, bins=20, alpha=0.7, label='Buy & Hold', density=True)
plt.title('Monthly Returns Distribution')
plt.legend()
plt.xlabel('Monthly Return')

# Rolling Sharpe ratio
plt.subplot(2, 2, 3)
rolling_sharpe_strategy = portfolio_returns.rolling(63).mean() / portfolio_returns.rolling(63).std() * np.sqrt(252)
rolling_sharpe_benchmark = spy_returns.rolling(63).mean() / spy_returns.rolling(63).std() * np.sqrt(252)

plt.plot(rolling_sharpe_strategy.index, rolling_sharpe_strategy, label='Strategy')
plt.plot(rolling_sharpe_benchmark.index, rolling_sharpe_benchmark, label='Buy & Hold')
plt.title('Rolling 3-Month Sharpe Ratio')
plt.legend()
plt.grid(True, alpha=0.3)

# Signal distribution
plt.subplot(2, 2, 4)
signals.value_counts().sort_index().plot(kind='bar')
plt.title('Signal Distribution')
plt.xlabel('Signal')
plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

## 9. Risk Analysis

In [None]:
# Risk metrics summary
strategy_metrics = {
    'Annualized Return': backtest_results['annualized_return'],
    'Volatility': backtest_results['volatility'],
    'Sharpe Ratio': backtest_results['sharpe_ratio'],
    'Sortino Ratio': backtest_results['sortino_ratio'],
    'Calmar Ratio': backtest_results['calmar_ratio'],
    'Max Drawdown': backtest_results['max_drawdown'],
    'Win Rate': backtest_results['win_rate'],
    'Profit Factor': backtest_results['profit_factor']
}

# Benchmark metrics
benchmark_return = spy_returns.mean() * 252
benchmark_vol = spy_returns.std() * np.sqrt(252)
benchmark_sharpe = benchmark_return / benchmark_vol

benchmark_cumret = (1 + spy_returns).cumprod()
benchmark_dd = (benchmark_cumret / benchmark_cumret.expanding().max() - 1).min()

benchmark_metrics = {
    'Annualized Return': benchmark_return,
    'Volatility': benchmark_vol,
    'Sharpe Ratio': benchmark_sharpe,
    'Max Drawdown': benchmark_dd
}

# Create comparison table
comparison_df = pd.DataFrame({
    'Strategy': [strategy_metrics.get(k, 'N/A') for k in ['Annualized Return', 'Volatility', 'Sharpe Ratio', 'Max Drawdown']],
    'Buy & Hold': [benchmark_metrics.get(k, 'N/A') for k in ['Annualized Return', 'Volatility', 'Sharpe Ratio', 'Max Drawdown']]
}, index=['Annualized Return', 'Volatility', 'Sharpe Ratio', 'Max Drawdown'])

print("Performance Comparison:")
print(comparison_df.round(4))

print(f"\nAdditional Strategy Metrics:")
print(f"Win Rate: {strategy_metrics['Win Rate']:.2%}")
print(f"Profit Factor: {strategy_metrics['Profit Factor']:.2f}")
print(f"Number of Trades: {backtest_results['num_trades']}")
print(f"Transaction Costs: ${backtest_results['total_costs']:.2f} ({backtest_results['cost_drag']:.2%} drag)")

## 10. Model Validation Summary

In [None]:
print("=" * 60)
print("MOMENTUM TRADING SYSTEM VALIDATION SUMMARY")
print("=" * 60)

print(f"\n1. DATA QUALITY:")
print(f"   - Dataset: {spy_data.shape[0]} observations")
print(f"   - Missing values: {quality_metrics['missing_values_pct']:.2f}%")
print(f"   - Data completeness: {quality_metrics['data_completeness']:.2f}")

print(f"\n2. FRACTIONAL DIFFERENTIATION:")
print(f"   - Optimal d parameter: {optimal_d:.2f}")
print(f"   - Stationarity achieved: {frac_diff_stationarity['adf']['is_stationary']}")
print(f"   - Memory preservation: {memory_correlation:.3f}")

print(f"\n3. FEATURE ENGINEERING:")
print(f"   - Total features created: {features.shape[1]}")
print(f"   - Features selected: {len(model.selected_features)}")
print(f"   - Top feature: {feature_importance.index[0]}")

print(f"\n4. LABELING:")
print(f"   - Events generated: {len(labeled_events)}")
print(f"   - Label distribution: {label_analysis['label_percentages']}")
print(f"   - Imbalance ratio: {label_analysis['imbalance_ratio']:.2f}")

print(f"\n5. MODEL VALIDATION:")
print(f"   - CV Accuracy: {cv_results['accuracy'].mean():.3f} ± {cv_results['accuracy'].std():.3f}")
print(f"   - CV F1-Score: {cv_results['f1'].mean():.3f} ± {cv_results['f1'].std():.3f}")
print(f"   - Model type: {model.model_type}")

print(f"\n6. BACKTEST PERFORMANCE:")
print(f"   - Total Return: {backtest_results['total_return']:.2%}")
print(f"   - Sharpe Ratio: {backtest_results['sharpe_ratio']:.2f}")
print(f"   - Max Drawdown: {backtest_results['max_drawdown']:.2%}")
print(f"   - Win Rate: {backtest_results['win_rate']:.2%}")
print(f"   - vs Buy & Hold Return: {benchmark_return:.2%}")

print(f"\n7. SYSTEM VALIDATION:")
validation_passed = (
    frac_diff_stationarity['adf']['is_stationary'] and
    cv_results['accuracy'].mean() > 0.33 and  # Better than random for 3-class
    backtest_results['sharpe_ratio'] > 0 and
    len(labeled_events) > 100
)
print(f"   - Validation Status: {'PASSED' if validation_passed else 'NEEDS REVIEW'}")
print(f"   - Ready for live trading: {'YES' if validation_passed and backtest_results['sharpe_ratio'] > 1.0 else 'NEEDS OPTIMIZATION'}")

print("\n" + "="*60)