In [1]:
%load_ext autoreload
%autoreload 2

In [3]:
from Backtest import Backtest

In [5]:
# %% [markdown]
# # FactorPipeline Testing Notebook
# This notebook tests the functionality of our FactorPipeline implementation.

# %% [markdown]
# ## 1. Setup and Imports

# %%
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yaml
from datetime import datetime, timedelta
from typing import Dict, List

# Import our FactorPipeline class
from FactorPipeline import FactorPipeline

# Set random seed for reproducibility
np.random.seed(42)

# %% [markdown]
# ## 2. Create Mock Components

# %%
class MockMLModel:
    """Mock ML model for testing"""
    def predict(self, features: pd.DataFrame) -> pd.Series:
        # Generate predictions between -1 and 1
        symbols = list(features.keys())  # Get symbols from features dict
        predictions = np.random.uniform(-1, 1, len(symbols))
        return pd.Series(predictions, index=symbols)
        
    def update(self, new_data):
        pass  # Mock update method

class MockMeanReversion:
    """Mock mean reversion analyzer for testing"""
    def generate_signals(self, data: Dict[str, pd.DataFrame]):
        # Generate random z-scores
        symbols = list(data.keys())
        scores = np.random.normal(0, 1, len(symbols))
        signal_strength = pd.Series(scores, index=symbols)
        
        # Create mock signal object
        class Signals:
            def __init__(self, strength):
                self.signal_strength = strength
                
        return Signals(signal_strength)

# %% [markdown]
# ## 3. Generate Test Data

# %%
def generate_test_data(
    symbols: List[str],
    days: int = 100,
    start_date: datetime = datetime(2023, 1, 1)
) -> Dict[str, pd.DataFrame]:
    """Generate realistic test data for multiple symbols"""
    data = {}
    
    for symbol in symbols:
        # Generate dates
        dates = [start_date + timedelta(days=x) for x in range(days)]
        
        # Generate price data with trend and volatility
        returns = np.random.normal(0.0005, 0.02, days)  # Daily returns
        prices = 100 * np.exp(np.cumsum(returns))  # Log-normal prices
        
        # Generate volume data
        volume = np.random.lognormal(15, 0.5, days)  # Log-normal volumes
        
        # Create DataFrame
        df = pd.DataFrame({
            'Open': prices * (1 + np.random.normal(0, 0.002, days)),
            'High': prices * (1 + np.abs(np.random.normal(0, 0.004, days))),
            'Low': prices * (1 - np.abs(np.random.normal(0, 0.004, days))),
            'Close': prices,
            'Volume': volume.astype(int)
        }, index=dates)
        
        data[symbol] = df
    
    return data

# %% [markdown]
# ## 4. Load Configuration

# %%
# Load or create config
config = {
    'factor_pipeline': {
        'model_weight': 0.5,
        'mean_reversion_weight': 0.5,
        'min_score_threshold': 0.1,
        'confidence_threshold': 0.6,
        'prediction_frequency': 1
    },
    'portfolio_strategy': {
        'max_position_size': 0.10,
        'min_positions': 3,
        'target_leverage': 1.0,
        'risk_free_rate': 0.02,
        'max_sector_exposure': 0.30,
    }
}

# %% [markdown]
# ## 5. Initialize and Test Pipeline

# %%
# Test symbols
symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META', 'NVDA', 'TSLA', 'JPM', 'V', 'WMT']

# Generate test data
data = generate_test_data(symbols)

# Initialize pipeline components
ml_model = MockMLModel()
mean_reversion = MockMeanReversion()

# Initialize pipeline
pipeline = FactorPipeline(
    ml_model=ml_model,
    mean_reversion=mean_reversion,
    data=data,
    config=config
)

# Update pipeline
pipeline.update()

# %% [markdown]
# ## 6. Analyze Results

# %%
# Get signal metrics
metrics_df = pipeline.get_signal_metrics()
print("\nSignal Metrics:")
display(metrics_df)

# Get current positions
longs, shorts = pipeline.get_current_positions()
print("\nLong Positions:")
for symbol, size in longs.items():
    print(f"{symbol}: {size:.2%}")
    
print("\nShort Positions:")
for symbol, size in shorts.items():
    print(f"{symbol}: {size:.2%}")

# %% [markdown]
# ## 7. Visualize Results

# %%
# Plot signal distributions
plt.figure(figsize=(15, 5))

# ML Signals
plt.subplot(131)
sns.histplot(metrics_df['ml_signal'], bins=20)
plt.title('ML Signal Distribution')
plt.xlabel('Signal Value')

# Mean Reversion Signals
plt.subplot(132)
sns.histplot(metrics_df['mean_rev_signal'], bins=20)
plt.title('Mean Reversion Signal Distribution')
plt.xlabel('Signal Value')

# Combined Signals
plt.subplot(133)
sns.histplot(metrics_df['combined_signal'], bins=20)
plt.title('Combined Signal Distribution')
plt.xlabel('Signal Value')

plt.tight_layout()
plt.show()

# %%
# Plot position allocations
plt.figure(figsize=(12, 6))

# Combine long and short positions
all_positions = pd.Series({**longs, **{k: -v for k, v in shorts.items()}})
colors = ['g' if v > 0 else 'r' for v in all_positions]

plt.bar(all_positions.index, all_positions.values, color=colors)
plt.title('Position Allocations')
plt.xticks(rotation=45)
plt.ylabel('Position Size')
plt.axhline(y=0, color='black', linestyle='-', alpha=0.2)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# %% [markdown]
# ## 8. Test Signal Updates

# %%
def test_signal_stability():
    """Test stability of signals over multiple updates"""
    signal_history = []
    position_history = []
    
    # Run multiple updates
    for _ in range(5):
        pipeline.update()
        metrics = pipeline.get_signal_metrics()
        longs, shorts = pipeline.get_current_positions()
        
        signal_history.append(metrics['combined_signal'].to_dict())
        position_history.append({
            'longs': list(longs.keys()),
            'shorts': list(shorts.keys())
        })
    
    return signal_history, position_history

# Run stability test
signal_history, position_history = test_signal_stability()

# Print position changes
print("Position Changes Over Updates:")
for i, pos in enumerate(position_history):
    print(f"\nUpdate {i+1}:")
    print(f"Longs: {pos['longs']}")
    print(f"Shorts: {pos['shorts']}")

# Plot signal stability
signals_df = pd.DataFrame(signal_history)
plt.figure(figsize=(12, 6))
signals_df.plot(marker='o')
plt.title('Signal Stability Over Updates')
plt.xlabel('Update')
plt.ylabel('Signal Value')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.tight_layout()
plt.show()

# %% [markdown]
# ## 9. Test Risk Filters

# %%
def test_risk_filters():
    """Test the impact of risk filters"""
    # Get pre-filter signals
    pre_filter = pipeline.current_signals.combined_signals
    
    # Get post-filter scores
    post_filter = pipeline.current_rankings.filtered_scores
    
    # Compare
    comparison = pd.DataFrame({
        'pre_filter': pre_filter,
        'post_filter': post_filter
    })
    
    return comparison

# Run risk filter test
filter_comparison = test_risk_filters()

# Plot comparison
plt.figure(figsize=(10, 6))
plt.scatter(filter_comparison.index, filter_comparison['pre_filter'], 
           label='Pre-Filter', alpha=0.6)
plt.scatter(filter_comparison.index, filter_comparison['post_filter'], 
           label='Post-Filter', alpha=0.6)
plt.title('Impact of Risk Filters')
plt.xticks(rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# %% [markdown]
# ## 10. Validation Checks

# %%
def run_validation_checks():
    """Run validation checks on pipeline outputs"""
    checks = {
        'Position Sizes Sum to 1': abs(sum(longs.values()) - 1.0) < 0.0001 and 
                                  abs(sum(shorts.values()) - 1.0) < 0.0001,
        'No Position Size Exceeds Max': all(v <= config['portfolio_strategy']['max_position_size'] 
                                          for v in list(longs.values()) + list(shorts.values())),
        'Minimum Positions Met': len(longs) >= config['portfolio_strategy']['min_positions'] and 
                               len(shorts) >= config['portfolio_strategy']['min_positions'],
        'No Overlapping Positions': len(set(longs.keys()) & set(shorts.keys())) == 0
    }
    
    return pd.Series(checks)

# Run validation checks
validation_results = run_validation_checks()
print("\nValidation Checks:")
display(validation_results)

Error generating signals: 'NoneType' object has no attribute 'keys'
Error updating factor pipeline: 'NoneType' object has no attribute 'keys'


AttributeError: 'NoneType' object has no attribute 'keys'