In [1]:
import sys
import os
import io
from contextlib import redirect_stdout
import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path
import vectorbt as vbt
from scipy import signal, stats
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from abc import ABC, abstractmethod
from typing import Dict, Tuple, List

import warnings
warnings.filterwarnings('ignore')

# Base Strategy Class
class Strategy(ABC):
    """Base class for all trading strategies"""
    
    def __init__(self, df: pd.DataFrame, target_col: str = 'cad_ig_er_ytd_index'):
        self.df = df.copy()
        self.target_col = target_col
        self.name = self.__class__.__name__
        
    @abstractmethod
    def generate_signals(self) -> pd.Series:
        """Generate trading signals"""
        pass

# Strategy Implementations
class BuyAndHoldStrategy(Strategy):
    """Buy and Hold Strategy - Always invested"""
    
    def generate_signals(self) -> pd.Series:
        """Generate constant True signals for buy and hold"""
        return pd.Series(True, index=self.df.index)

class VolatilityRegimeStrategy(Strategy):
    """Volatility Regime Strategy"""
    
    def __init__(self, df: pd.DataFrame,
                 vol_window: int = 30,
                 correlation_window: int = 90,
                 regime_window: int = 252,
                 vol_threshold: float = 1.2):
        super().__init__(df)
        self.vol_window = vol_window
        self.correlation_window = correlation_window
        self.regime_window = regime_window
        self.vol_threshold = vol_threshold
        
    def _calculate_vol_surface_score(self) -> pd.Series:
        implied_vol = self.df['vix']
        realized_vols = pd.DataFrame(index=self.df.index)
        realized_vols[f'vol_{self.vol_window}'] = self.df[self.target_col].pct_change().rolling(self.vol_window).std() * np.sqrt(252)
        vol_premium = implied_vol - realized_vols.mean(axis=1)
        vol_premium_zscore = (vol_premium - vol_premium.rolling(252).mean()) / vol_premium.rolling(252).std()
        return -vol_premium_zscore
        
    def _calculate_correlation_score(self) -> pd.Series:
        target_returns = self.df[self.target_col].pct_change()
        assets = ['cad_oas', 'us_hy_oas', 'us_ig_oas']
        asset_returns = self.df[assets].pct_change()
        correlations = pd.DataFrame(index=self.df.index)
        for asset in assets:
            correlations[asset] = target_returns.rolling(self.correlation_window).corr(asset_returns[asset])
        avg_correlation = correlations.mean(axis=1)
        correlation_zscore = (avg_correlation - avg_correlation.rolling(252).mean()) / avg_correlation.rolling(252).std()
        return -correlation_zscore
        
    def _calculate_vol_regime(self) -> pd.Series:
        assets = ['cad_oas', 'us_hy_oas', 'us_ig_oas']
        vol_indicators = pd.DataFrame(index=self.df.index)
        for asset in assets:
            vol = self.df[asset].pct_change().rolling(20).std() * np.sqrt(252)
            vol_indicators[f'{asset}_vol'] = (vol < vol.rolling(252).mean())
        vol_indicators['vix_regime'] = self.df['vix'] < self.df['vix'].rolling(252).mean()
        low_vol_regime = vol_indicators.mean(axis=1) > 0.5
        return low_vol_regime
        
    def generate_signals(self) -> pd.Series:
        vol_surface_score = self._calculate_vol_surface_score()
        correlation_score = self._calculate_correlation_score()
        vol_regime = self._calculate_vol_regime()
        returns = self.df[self.target_col].pct_change()
        trend = returns.rolling(60).mean() / returns.rolling(60).std()
        trend_strength = trend.abs()
        
        print("\nVolatility Regime Strategy Analysis:")
        print("===================================")
        low_vol_days = vol_regime.sum()
        print(f"Low Volatility Regime: {low_vol_days} days ({low_vol_days/len(vol_regime)*100:.1f}% of time)")
        print(f"Average Trend Strength: {trend_strength.mean():.2f}")
        print(f"Average Correlation Score: {correlation_score.mean():.2f}")
        
        signals = (
            vol_regime &
            (vol_surface_score > 0) &
            (correlation_score > -0.3) &
            (trend_strength > 0.1)
        )
        signals = signals.rolling(5).mean() > 0.6
        return signals

class AdaptiveTrendStrategy(Strategy):
    """Adaptive Trend Strategy"""
    
    def __init__(self, df: pd.DataFrame,
                 cycle_lookbacks: list = [10, 20, 40],
                 efficiency_window: int = 10,
                 min_trend_strength: float = 0.4):
        super().__init__(df)
        self.cycle_lookbacks = cycle_lookbacks
        self.efficiency_window = efficiency_window
        self.min_trend_strength = min_trend_strength
        
    def _decompose_series(self, series: pd.Series, window: int) -> tuple:
        normalized = (series - series.mean()) / series.std()
        nyq = 0.5 * 1
        cutoff = 1 / window
        order = 2
        b, a = signal.butter(order, cutoff/nyq, btype='low')
        trend = pd.Series(signal.filtfilt(b, a, normalized), index=series.index)
        cycle = normalized - trend
        return trend, cycle
        
    def _calculate_trend_strength(self, series: pd.Series, window: int) -> pd.Series:
        trend_strength = pd.Series(index=series.index)
        for i in range(window, len(series)):
            y = series.iloc[i-window:i]
            X = np.arange(window).reshape(-1, 1)
            reg = LinearRegression().fit(X, y)
            trend_strength.iloc[i] = reg.score(X, y)
        return trend_strength.fillna(0)
        
    def _calculate_cycle_score(self, cycle: pd.Series) -> pd.Series:
        cycle_zscore = (cycle - cycle.rolling(252).mean()) / cycle.rolling(252).std()
        cycle_score = -cycle_zscore
        return cycle_score
        
    def _calculate_adaptive_lookback(self) -> pd.Series:
        vol = self.df[self.target_col].pct_change().rolling(20).std() * np.sqrt(252)
        vol_ratio = vol / vol.rolling(252).mean()
        base_lookback = np.mean(self.cycle_lookbacks)
        lookbacks = pd.Series(base_lookback, index=self.df.index)
        adjusted_lookbacks = lookbacks * vol_ratio.fillna(1)
        return adjusted_lookbacks.clip(min(self.cycle_lookbacks), max(self.cycle_lookbacks))
        
    def _calculate_market_efficiency_ratio(self) -> pd.Series:
        price = self.df[self.target_col]
        dir_move = abs(price - price.shift(self.efficiency_window))
        total_move = pd.Series(0, index=price.index)
        for i in range(1, self.efficiency_window + 1):
            total_move += abs(price - price.shift(i))
        efficiency_ratio = dir_move / total_move
        return efficiency_ratio
        
    def generate_signals(self) -> pd.Series:
        signals = pd.Series(False, index=self.df.index)
        lookbacks = self._calculate_adaptive_lookback()
        avg_lookback = int(lookbacks.mean())
        trend, cycle = self._decompose_series(self.df[self.target_col], avg_lookback)
        trend_strength = self._calculate_trend_strength(self.df[self.target_col], avg_lookback)
        cycle_score = self._calculate_cycle_score(cycle)
        efficiency_ratio = self._calculate_market_efficiency_ratio()
        
        print("\nAdaptive Trend Strategy Analysis:")
        print("================================")
        print(f"Average Trend Strength: {trend_strength.mean():.2f}")
        print(f"Average Efficiency Ratio: {efficiency_ratio.mean():.2f}")
        print(f"Average Lookback Period: {avg_lookback} days")
        
        trending_market = trend_strength > self.min_trend_strength
        efficient_market = efficiency_ratio > 0.3
        trend_signals = trending_market & (trend.diff() > 0)
        reversion_signals = (~trending_market) & (cycle_score > 0.5)
        signals = trend_signals | reversion_signals
        return signals.fillna(False)

# Backtest Configuration
class BacktestConfig:
    def __init__(self, 
                 start_date=None, 
                 end_date=None,
                 rebalance_freq='1D',  # '1D' for daily, 'M' for monthly
                 initial_capital=100,
                 size=1.0,
                 size_type='percent'):
        self.start_date = pd.to_datetime(start_date) if start_date else None
        self.end_date = pd.to_datetime(end_date) if end_date else None
        self.rebalance_freq = rebalance_freq  # Can use 'M' directly for resampling
        self.initial_capital = initial_capital
        self.size = size
        self.size_type = size_type

    @classmethod
    def DAILY(cls):
        return cls(rebalance_freq='1D')
    
    @classmethod
    def MONTHLY(cls):
        return cls(rebalance_freq='M')

def load_data(config: BacktestConfig) -> pd.DataFrame:
    data_path = os.path.join(os.getcwd(), '..', 'raw_data', 'df.csv')
    df = pd.read_csv(data_path)
    df['Date'] = pd.to_datetime(df['Date'])
    df.set_index('Date', inplace=True)
    
    # Get actual data range
    data_start = df.index.min()
    data_end = df.index.max()
    
    # Adjust config dates to available data range
    if config.start_date:
        config.start_date = max(config.start_date, data_start)
    else:
        config.start_date = data_start
        
    if config.end_date:
        config.end_date = min(config.end_date, data_end)
    else:
        config.end_date = data_end
    
    # Filter data using adjusted dates
    df = df[(df.index >= config.start_date) & (df.index <= config.end_date)]
    
    return df

def create_portfolio(strategy, price, signals, config: BacktestConfig):
    # Filter price and signals to config date range if dates are specified
    if config.start_date is not None:
        price = price[price.index >= config.start_date]
        signals = signals[signals.index >= config.start_date]
    if config.end_date is not None:
        price = price[price.index <= config.end_date]
        signals = signals[signals.index <= config.end_date]
    
    # Convert signals to boolean if they're not already
    signals = signals.astype(bool)
    
    # Resample signals based on rebalance frequency
    if config.rebalance_freq != '1D':
        monthly_signals = signals.resample('M').last()
        signals = monthly_signals.reindex(price.index, method='ffill')
        signals = signals.astype(bool)
    
    # Generate entries and exits
    entries = signals & ~signals.shift(1).fillna(False)
    exits = ~signals & signals.shift(1).fillna(False)
    
    return vbt.Portfolio.from_signals(
        price,
        entries,
        exits,
        freq='1D',
        init_cash=config.initial_capital,
        size=config.size,
        size_type=config.size_type,
        accumulate=False
    )

def format_results(stats_dict):
    df_stats = pd.DataFrame.from_dict(stats_dict, orient='index').T
    df_stats = df_stats.sort_values(by='Total Return [%]', axis=1, ascending=False)
    
    ordered_rows = df_stats.index.tolist()
    total_return_idx = ordered_rows.index('Total Return [%]')
    ordered_rows.remove('Annualized Return [%]')
    ordered_rows.remove('Annualized Volatility [%]')
    ordered_rows.insert(total_return_idx + 1, 'Annualized Return [%]')
    ordered_rows.insert(total_return_idx + 2, 'Annualized Volatility [%]')
    df_stats = df_stats.reindex(ordered_rows)
    
    formatted_df = df_stats.copy()
    
    formatted_df.loc['Start'] = formatted_df.loc['Start'].apply(lambda x: pd.to_datetime(x).strftime('%m/%d/%Y'))
    formatted_df.loc['End'] = formatted_df.loc['End'].apply(lambda x: pd.to_datetime(x).strftime('%m/%d/%Y'))
    
    percentage_rows = ['Total Return [%]', 'Annualized Return [%]', 'Annualized Volatility [%]']
    for row in percentage_rows:
        formatted_df.loc[row] = formatted_df.loc[row].apply(
            lambda x: f"{x:.2f}%" if pd.notnull(x) else x
        )
    
    for row in ['Start Value', 'End Value']:
        formatted_df.loc[row] = formatted_df.loc[row].apply(lambda x: f"{x:.2f}" if pd.notnull(x) else x)
    
    duration_rows = ['Avg Winning Trade Duration', 'Avg Losing Trade Duration', 'Max Drawdown Duration', 'Period']
    for row in duration_rows:
        if row in formatted_df.index:
            formatted_df.loc[row] = formatted_df.loc[row].apply(
                lambda x: f"{pd.Timedelta(x).days} days" if pd.notnull(x) else x
            )
    
    numeric_rows = [idx for idx in formatted_df.index 
                   if idx not in ['Start', 'End'] + percentage_rows + duration_rows + ['Start Value', 'End Value']]
    for row in numeric_rows:
        formatted_df.loc[row] = formatted_df.loc[row].apply(
            lambda x: f"{float(x):.2f}" if pd.notnull(x) and not isinstance(x, pd.Timedelta) else x
        )
    
    styled_df = formatted_df.style.set_properties(**{
        'text-align': 'center'
    }).set_table_styles([
        {'selector': 'th', 'props': [('text-align', 'center')]}
    ])
    
    return styled_df

def run_backtest(strategies, config: BacktestConfig):
    all_stats = {}
    
    for strategy in strategies:
        with redirect_stdout(io.StringIO()):
            signals = strategy.generate_signals()
        
        price = strategy.df[strategy.target_col]
        signals = signals.reindex(price.index)
        
        if not isinstance(signals.dtype, pd.BooleanDtype):
            signals = signals.astype(bool)
        
        pf = create_portfolio(strategy, price, signals, config)
        stats_series = pf.stats()
        
        returns = pf.returns()
        returns_stats = returns.vbt.returns(freq='1D', year_freq='365D')
        
        stats_series['Annualized Return [%]'] = returns_stats.annualized() * 100
        stats_series['Annualized Volatility [%]'] = returns_stats.annualized_volatility() * 100
        
        all_stats[strategy.__class__.__name__] = stats_series
    
    return format_results(all_stats)

# Initialize and run backtests
df = load_data(BacktestConfig())
strategies = [
    BuyAndHoldStrategy(df),
    VolatilityRegimeStrategy(df),
    AdaptiveTrendStrategy(df),
]

config_default = BacktestConfig(
    rebalance_freq='1D',
    initial_capital=100.0,
    size=1.0,
    size_type='percent'
)
results_default = run_backtest(strategies, config_default)

config_2020 = BacktestConfig(
    start_date='2009-01-01',
    end_date='2025-12-31',
    rebalance_freq='M',  # Use 'M' explicitly for monthly
    initial_capital=100,
    size=1.0,
    size_type='percent'
)
results_2020 = run_backtest(strategies, config_2020)

display(results_default)
display(results_2020)

Unnamed: 0,AdaptiveTrendStrategy,BuyAndHoldStrategy,VolatilityRegimeStrategy
Start,10/31/2002,10/31/2002,10/31/2002
End,12/27/2024,12/27/2024,12/27/2024
Period,5562 days,5562 days,5562 days
Start Value,100.00,100.00,100.00
End Value,225.32,134.69,109.89
Total Return [%],125.32%,34.69%,9.89%
Annualized Return [%],5.48%,1.97%,0.62%
Annualized Volatility [%],1.07%,1.82%,0.41%
Benchmark Return [%],34.69,34.69,34.69
Max Gross Exposure [%],100.00,100.00,100.00


Unnamed: 0,BuyAndHoldStrategy,AdaptiveTrendStrategy,VolatilityRegimeStrategy
Start,01/02/2009,01/02/2009,01/02/2009
End,12/27/2024,12/27/2024,12/27/2024
Period,4019 days,4019 days,4019 days
Start Value,100.00,100.00,100.00
End Value,150.27,144.61,103.90
Total Return [%],50.27%,44.61%,3.90%
Annualized Return [%],3.77%,3.41%,0.35%
Annualized Volatility [%],1.87%,1.56%,0.50%
Benchmark Return [%],50.27,50.27,50.27
Max Gross Exposure [%],100.00,100.00,100.00
