# AdaptiveRegimeStrategy V2

In [48]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from statsmodels.tsa.stattools import adfuller
from abc import ABC, abstractmethod
import warnings
from datetime import datetime, timedelta
warnings.filterwarnings('ignore')

## Feature Lab

In [49]:
class FeatureLab:
    """Shared mathematical engine for technical and statistical features."""
    
    @staticmethod
    def get_weights_frac_diff(d, size, threshold=1e-5):
        w = [1.0]
        for k in range(1, size):
            w_k = -w[-1] / k * (d - k + 1)
            w.append(w_k)
        w = np.array(w[::-1])
        w = w[np.abs(w) > threshold]
        return w

    @staticmethod
    def frac_diff_fixed(series, d, window=50):
        # Solves Stationarity Dilemma [cite: 61]
        weights = FeatureLab.get_weights_frac_diff(d, window)
        res = series.rolling(window=len(weights)).apply(lambda x: np.dot(x, weights), raw=True)
        return res

    @staticmethod
    def yang_zhang_volatility(df, window=30):
        # Captures intraday energy/gaps [cite: 82]
        log_ho = (df['High'] / df['Open']).apply(np.log)
        log_lo = (df['Low'] / df['Open']).apply(np.log)
        log_co = (df['Close'] / df['Open']).apply(np.log)
        log_oc = (df['Open'] / df['Close'].shift(1)).apply(np.log)
        log_cc = (df['Close'] / df['Close'].shift(1)).apply(np.log)
        
        rs = log_ho * (log_ho - log_co) + log_lo * (log_lo - log_co)
        close_vol = log_cc.rolling(window=window).var()
        open_vol = log_oc.rolling(window=window).var()
        window_rs = rs.rolling(window=window).mean()

        k = 0.34 / (1.34 + (window + 1) / (window - 1))
        return np.sqrt(open_vol + k * window_rs)

    @staticmethod
    def compute_rsi(series, window=14):
        delta = series.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))

    @staticmethod
    def triple_barrier_labels(prices, vol, pt=1.0, sl=1.0, barrier_window=10):
        """
        Implements the Triple Barrier Method.
        Labels: 1 (Profit Target Hit), -1 (Stop Loss Hit), 0 (Time Limit/Neutral)
        """
        labels = pd.Series(0, index=prices.index)
        # Shift prices to align future outcome with current row
        # However, to avoid look-ahead in features, we usually compute label for row t based on t+1...t+k
        # This function generates the TARGET variable (y) for training.
        
        limit = len(prices) - barrier_window
        p_values = prices.values
        v_values = vol.values
        
        for i in range(limit):
            current_p = p_values[i]
            current_vol = v_values[i]
            
            # Dynamic barriers based on volatility [cite: 215]
            target = current_p * (1 + pt * current_vol)
            stop = current_p * (1 - sl * current_vol)
            
            future_window = p_values[i+1 : i+1+barrier_window]
            
            hit_target = np.where(future_window >= target)[0]
            hit_stop = np.where(future_window <= stop)[0]
            
            first_target = hit_target[0] if len(hit_target) > 0 else barrier_window + 1
            first_stop = hit_stop[0] if len(hit_stop) > 0 else barrier_window + 1
            
            if first_target < first_stop and first_target <= barrier_window:
                labels.iloc[i] = 1
            elif first_stop < first_target and first_stop <= barrier_window:
                labels.iloc[i] = 0 # In Meta-Labeling, we often treat Stop (-1) as 0 (Do Not Trade)
            # Else 0 (Time limit reached or neutral)
            
        return labels

## Base Strategy

In [50]:
class BaseStrategy(ABC):
    def __init__(self, ticker, start_date, end_date):
        self.ticker = ticker
        self.start_date = start_date
        self.end_date = end_date
        self.data = None
        self.results = None
        self.metrics = {}

    def fetch_data(self, warmup_years=2):
        start_dt = datetime.strptime(self.start_date, "%Y-%m-%d")
        warmup_start_dt = start_dt - timedelta(days=warmup_years*365)
        warmup_start_str = warmup_start_dt.strftime("%Y-%m-%d")
        
        try:
            self.data = yf.download(self.ticker, start=warmup_start_str, end=self.end_date, progress=False, auto_adjust=False)
            if isinstance(self.data.columns, pd.MultiIndex): 
                self.data.columns = self.data.columns.get_level_values(0)
            if 'Adj Close' not in self.data.columns: 
                self.data['Adj Close'] = self.data['Close']
            self.data['Returns'] = self.data['Adj Close'].pct_change()
            self.data.dropna(inplace=True)
        except Exception as e:
            print(f"Error fetching {self.ticker}: {e}")
            self.data = pd.DataFrame()

    @abstractmethod
    def generate_signals(self):
        pass

    def run_backtest(self, transaction_cost=0.0005, rebalance_threshold=0.1):
        if self.data is None or self.data.empty: return
        
        backtest_mask = self.data.index >= self.start_date
        df = self.data.loc[backtest_mask].copy()
        if df.empty: return

        # Position Smoothing
        clean_positions = []
        current_pos = 0.0
        raw_signals = df['Signal'].values
        
        for target in raw_signals:
            if abs(target - current_pos) > rebalance_threshold:
                current_pos = target
            clean_positions.append(current_pos)
            
        df['Position'] = clean_positions
        df['Prev_Position'] = df['Position'].shift(1).fillna(0)
        df['Turnover'] = (df['Prev_Position'] - df['Position'].shift(2).fillna(0)).abs()
        df['Gross_Returns'] = df['Prev_Position'] * df['Returns']
        df['Net_Returns'] = df['Gross_Returns'] - (df['Turnover'] * transaction_cost)
        df['Net_Returns'].fillna(0, inplace=True)
        
        df['Cumulative_Strategy'] = (1 + df['Net_Returns']).cumprod()
        df['Cumulative_Market'] = (1 + df['Returns']).cumprod()
        
        roll_max = df['Cumulative_Strategy'].cummax()
        df['Drawdown'] = (df['Cumulative_Strategy'] / roll_max) - 1.0
        
        self.results = df
        
        # Performance Calculation
        total_ret = df['Cumulative_Strategy'].iloc[-1] - 1
        vol = df['Net_Returns'].std() * np.sqrt(252)
        sharpe = (df['Net_Returns'].mean() / df['Net_Returns'].std()) * np.sqrt(252) if vol > 0 else 0
        max_dd = df['Drawdown'].min()
        
        self.metrics = {
            'Total Return': total_ret,
            'Sharpe Ratio': sharpe,
            'Max Drawdown': max_dd
        }
        return df

## Model From NB V1

### V1_Baseline

In [51]:
class StrategyV1_Baseline(BaseStrategy):
    """V1: Fixed FracDiff, Standard GMM."""
    def generate_signals(self):
        if self.data is None or self.data.empty: return
        df = self.data.copy()
        
        df['Volatility'] = FeatureLab.yang_zhang_volatility(df)
        df['FracDiff'] = FeatureLab.frac_diff_fixed(df['Adj Close'].apply(np.log), d=0.4, window=50)
        df['RSI'] = FeatureLab.compute_rsi(df['Adj Close'])
        df['Returns_Smoothed'] = df['Returns'].rolling(5).mean()
        df['Vol_Smoothed'] = df['Volatility'].rolling(5).mean()
        df.dropna(inplace=True)
        
        X = df[['Returns_Smoothed', 'Vol_Smoothed']].values
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        gmm = GaussianMixture(n_components=3, random_state=42)
        df['Cluster'] = gmm.fit_predict(X_scaled)
        
        stats = df.groupby('Cluster')['Returns_Smoothed'].mean().sort_values().index
        mapping = {stats[0]: -1, stats[1]: 0, stats[2]: 1}
        df['Regime'] = df['Cluster'].map(mapping)
        
        df['Signal'] = 0
        df.loc[(df['Regime'] == 1) & (df['FracDiff'] > 0), 'Signal'] = 1
        df.loc[(df['Regime'] == 0) & (df['RSI'] < 40), 'Signal'] = 1
        
        target_vol = 0.15 / np.sqrt(252)
        df['Vol_Scaler'] = (target_vol / df['Volatility']).clip(upper=1.5)
        df['Signal'] = df['Signal'] * df['Vol_Scaler']
        self.data = df

### V3_Macro

In [52]:
class StrategyV3_Macro(BaseStrategy):
    """V3: Macro (SPY) Filter."""
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        self.spy_data = None

    def fetch_data(self, warmup_years=2):
        super().fetch_data(warmup_years)
        start_dt = datetime.strptime(self.start_date, "%Y-%m-%d") - timedelta(days=warmup_years*365)
        try:
            spy = yf.download("SPY", start=start_dt.strftime("%Y-%m-%d"), end=self.end_date, progress=False, auto_adjust=False)
            if isinstance(spy.columns, pd.MultiIndex): spy.columns = spy.columns.get_level_values(0)
            self.spy_data = spy[['Adj Close']].rename(columns={'Adj Close': 'SPY_Price'})
        except: pass

    def generate_signals(self):
        if self.data is None or self.data.empty: return
        df = self.data.copy()
        
        if self.spy_data is not None:
            df = df.join(self.spy_data, how='left')
            df['SPY_MA200'] = df['SPY_Price'].rolling(window=200).mean()
            df['Macro_Bull'] = df['SPY_Price'] > df['SPY_MA200']
        else:
            df['Macro_Bull'] = True
            
        df['Volatility'] = FeatureLab.yang_zhang_volatility(df)
        df['FracDiff'] = FeatureLab.frac_diff_fixed(df['Adj Close'].apply(np.log), d=0.4, window=50)
        df['RSI'] = FeatureLab.compute_rsi(df['Adj Close'])
        df.dropna(inplace=True)
        
        df['Signal'] = 0
        df.loc[(df['FracDiff'] > 0), 'Signal'] = 1
        df.loc[df['Macro_Bull'] == False, 'Signal'] = 0
        
        target_vol = 0.15 / np.sqrt(252)
        df['Signal'] = df['Signal'] * (target_vol / df['Volatility']).clip(upper=1.5)
        self.data = df

### V9_Unshackled

In [53]:
class StrategyV9_RegimeUnshackled(BaseStrategy):
    """
    V9 (Robust): The 'Unshackled' Regime Model with Walk-Forward Learning.
    
    CRITICAL FIX:
    - Replaces static gmm.fit_predict() with a Rolling Walk-Forward loop.
    - Eliminates Look-Ahead Bias by retraining the regime model every month
      using only the trailing 1-year window.
    - Solves the 'Label Switching' problem dynamically at each step.
    """
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        self.spy_data = None

    def fetch_data(self, warmup_years=2):
        super().fetch_data(warmup_years)
        start_dt = datetime.strptime(self.start_date, "%Y-%m-%d") - timedelta(days=warmup_years*365)
        try:
            # Fetch Macro Context (SPY) for the 'Alpha Clause'
            spy = yf.download("SPY", start=start_dt.strftime("%Y-%m-%d"), end=self.end_date, progress=False, auto_adjust=False)
            if isinstance(spy.columns, pd.MultiIndex): spy.columns = spy.columns.get_level_values(0)
            
            # Simple Macro Trend (200 SMA)
            spy['Macro_Trend'] = (spy['Adj Close'] > spy['Adj Close'].rolling(200).mean()).astype(int)
            self.spy_data = spy[['Macro_Trend']]
        except: 
            pass

    def _apply_kalman_filter(self, prices):
        # [cite: 151] Kalman Filter for recursive state estimation
        x = prices.values
        n = len(x)
        state = np.zeros(n)
        slope = np.zeros(n)
        state[0] = x[0]
        P, Q, R = 1.0, 0.001, 0.1 # Static params for robustness
        
        for t in range(1, n):
            pred_state = state[t-1] + slope[t-1]
            pred_P = P + Q
            measurement = x[t]
            residual = measurement - pred_state
            
            K = pred_P / (pred_P + R)
            state[t] = pred_state + K * residual
            slope[t] = 0.9 * slope[t-1] + 0.1 * (state[t] - state[t-1])
            P = (1 - K) * pred_P
            
        return pd.Series(slope, index=prices.index)

    def generate_signals(self):
        if self.data is None or self.data.empty: return
        df = self.data.copy()
        
        # --- 1. Feature Engineering ---
        if self.spy_data is not None:
            df = df.join(self.spy_data, how='left').fillna(method='ffill')
        else: 
            df['Macro_Trend'] = 1
            
        # Volatility & Kalman Slope [cite: 82, 151]
        df['Volatility'] = FeatureLab.yang_zhang_volatility(df)
        df['Kalman_Slope'] = self._apply_kalman_filter(np.log(df['Adj Close']))
        df['RSI'] = FeatureLab.compute_rsi(df['Adj Close'])
        
        # Features for GMM (Smoothed to reduce noise)
        df['Returns_Smoothed'] = df['Returns'].rolling(5).mean()
        df['Vol_Smoothed'] = df['Volatility'].rolling(5).mean()
        
        # Drop NaN from warmup
        df.dropna(inplace=True)
        
        # --- 2. Rolling Walk-Forward GMM ---
        # We need to predict 'Regime_Type' for every row without looking ahead.
        
        train_window = 252  # Lookback: 1 Year
        refit_step = 21     # Re-train every Month
        
        # Initialize default regime (CHOP)
        df['Regime_Type'] = 'CHOP' 
        
        # Prepare Feature Matrix
        X = df[['Returns_Smoothed', 'Vol_Smoothed']].values
        scaler = StandardScaler()
        
        indices = np.arange(len(df))
        
        # Start loop after the first training window
        for t in range(train_window, len(df), refit_step):
            start_idx = t - train_window
            end_idx = t
            predict_end_idx = min(t + refit_step, len(df))
            
            # A. Train on PAST window
            X_train = X[start_idx:end_idx]
            
            # Scale locally (Standardization must also be walk-forward)
            #  "Standard algorithms assume training data... same probability distribution"
            scaler_local = StandardScaler()
            X_train_scaled = scaler_local.fit_transform(X_train)
            
            try:
                # Fit GMM
                gmm = GaussianMixture(n_components=3, random_state=42, n_init=5)
                gmm.fit(X_train_scaled)
                
                # B. Predict on NEXT window (Future)
                X_future = X[end_idx:predict_end_idx]
                X_future_scaled = scaler_local.transform(X_future)
                clusters_future = gmm.predict(X_future_scaled)
                
                # C. Dynamic Label Mapping (Solve Label Switching)
                # We map clusters to Bull/Bear based on the *Training* means
                # Calculate mean return for each cluster center in the training set
                # We can approximate this by predicting the training set again
                train_clusters = gmm.predict(X_train_scaled)
                
                # Create a temp dataframe to sort clusters
                temp_df = pd.DataFrame({
                    'Ret': X_train[:, 0], # Returns is column 0
                    'Cluster': train_clusters
                })
                
                stats = temp_df.groupby('Cluster')['Ret'].mean().sort_values()
                
                # The cluster with lowest mean return = BEAR
                # The cluster with highest mean return = BULL
                # Middle = CHOP
                if len(stats) == 3:
                    bear_c = stats.index[0]
                    chop_c = stats.index[1]
                    bull_c = stats.index[2]
                    
                    mapping = {bear_c: 'BEAR', chop_c: 'CHOP', bull_c: 'BULL'}
                else:
                    # Fallback if GMM collapses to fewer clusters
                    mapping = {c: 'CHOP' for c in stats.index}

                # Apply mapping to the prediction window
                regimes_mapped = [mapping.get(c, 'CHOP') for c in clusters_future]
                
                # Store results in the main DataFrame
                # Use iloc for integer-based indexing on the slice
                df.iloc[end_idx:predict_end_idx, df.columns.get_loc('Regime_Type')] = regimes_mapped
                
            except Exception as e:
                # Keep default 'CHOP' on failure
                pass

        # --- 3. Unshackled Signal Logic (unchanged from V9) ---
        df['Signal'] = 0.0
        
        # A. BULL REGIME: "Trust The Trend"
        bull_signal = (df['Regime_Type'] == 'BULL')
        df.loc[bull_signal, 'Signal'] = 1.0
        
        # Boost: If Kalman agrees (Strong Trend), increase leverage
        strong_trend = bull_signal & (df['Kalman_Slope'] > 0)
        df.loc[strong_trend, 'Signal'] = 1.3
        
        # B. CHOP REGIME: "Buy Dips"
        # chop_buy = (df['Regime_Type'] == 'CHOP') & (df['RSI'] < 45)
        # df.loc[chop_buy, 'Signal'] = 1.0

        # 1. Calculate a "Floor" (e.g., Lower Bollinger Band)
        df['SMA_20'] = df['Adj Close'].rolling(20).mean()
        df['BB_Lower'] = df['SMA_20'] - 2 * df['SMA_20'].rolling(20).std()

        # 2. Strict Chop Entry
        # OLD: RSI < 45
        # NEW: RSI < 45 AND Price > BB_Lower (Don't buy if it's crashing through the floor)
        #      AND FracDiff > -0.1 (Don't buy if trend is catastrophically negative)

        chop_buy = (
            (df['Regime_Type'] == 'CHOP') & 
            (df['RSI'] < 45) & 
            (df['Adj Close'] > df['BB_Lower']) # The Falling Knife Filter
        )
        df.loc[chop_buy, 'Signal'] = 1.0
        
        # C. BEAR REGIME: "Survival" (Cash unless deep panic)
        panic_buy = (df['Regime_Type'] == 'BEAR') & (df['RSI'] < 25)
        df.loc[panic_buy, 'Signal'] = 1.0
        
        # --- 4. The Alpha Clause (Macro Handling) ---
        target_vol = 0.15 / np.sqrt(252)
        
        # Volatility Sizing (Risk Parity) [cite: 254]
        vol_scaler = (target_vol / df['Volatility']).clip(upper=1.5)
        
        # Macro Filter: Halve size if SPY is bearish, but don't exit fully (Alpha Clause)
        macro_scaler = df['Macro_Trend'].map({1: 1.0, 0: 0.5})
        
        df['Signal'] = df['Signal'] * vol_scaler * macro_scaler
        
        self.data = df

### Ensemble

In [54]:
class Strategy_Ensemble(BaseStrategy):
    """
    The 'All-Weather' Ensemble.
    
    Combines V3 (Macro Trend) and V9 (Regime Unshackled) into a single
    portfolio-level signal.
    
    Logic:
    1. Runs V3 to capture high-beta trends (NVDA, Bitcoin).
    2. Runs V9 to capture regime-based alpha and protect downside (JPM, BABA).
    3. Blends signals using a 'Correlation-Adjusted' weighting or fixed 50/50.
    4. Applies a final Volatility Target to the combined equity curve to ensure
       the two strategies don't stack up to dangerous leverage.
    """
    def __init__(self, ticker, start_date, end_date, w_v3=0.5, w_v9=0.5):
        super().__init__(ticker, start_date, end_date)
        self.w_v3 = w_v3
        self.w_v9 = w_v9
        # Instantiate sub-strategies
        self.strat_v3 = StrategyV3_Macro(ticker, start_date, end_date)
        self.strat_v9 = StrategyV9_RegimeUnshackled(ticker, start_date, end_date)

    def fetch_data(self, warmup_years=2):
        # Fetch once for efficiency (logic could be optimized to share DF, 
        # but separate fetch ensures cleaner encapsulation)
        self.strat_v3.fetch_data(warmup_years)
        self.strat_v9.fetch_data(warmup_years)
        
        # We share the index/data from one of them for the main wrapper
        if self.strat_v3.data is not None and not self.strat_v3.data.empty:
            self.data = self.strat_v3.data.copy()
        elif self.strat_v9.data is not None:
            self.data = self.strat_v9.data.copy()

    def generate_signals(self):
        if self.strat_v3.data is None or self.strat_v9.data is None: return
        
        # 1. Generate Sub-Signals
        self.strat_v3.generate_signals()
        self.strat_v9.generate_signals()
        
        # Align Indices (Inner Join to be safe)
        df = self.data.copy()
        s3 = self.strat_v3.data['Signal']
        s9 = self.strat_v9.data['Signal']
        
        # Merge signals into main DF
        df['Sig_V3'] = s3
        df['Sig_V9'] = s9
        df.dropna(inplace=True)
        
        # 2. The Allocation Logic
        # Default: Fixed Weight (Core-Satellite Approach)
        # V3 (Beta) + V9 (Alpha)
        
        # We blend the RAW signals.
        # Note: Signals are already Vol-Targeted to ~15% inside sub-classes.
        # Simple addition would double vol if correlation=1.
        raw_blend = (df['Sig_V3'] * self.w_v3) + (df['Sig_V9'] * self.w_v9)
        
        # 3. Ensemble Volatility Control
        # If V3 and V9 agree (both Long), we get high exposure.
        # If they disagree (V3 Long, V9 Cash), we get half exposure.
        # This naturally deleverages during uncertainty.
        
        df['Signal'] = raw_blend
        
        # Optional: Re-Target Volatility of the *Ensemble*
        # (Prevents leverage creep if strategies are highly correlated)
        # For now, we trust the weighted sum to act as a diversification benefit.
        
        self.data = df

### Adaptive Ensemble

In [55]:
class Strategy_Ensemble_Adaptive(BaseStrategy):
    """
    V10: The Adaptive Ensemble (Dynamic Weighting).
    
    Instead of fixed weights, this strategy re-allocates capital quarterly 
    based on the recent Risk-Adjusted Performance (Sharpe) of the sub-strategies.
    
    Logic:
    1. Lookback: 126 Days (6 Months).
    2. Rebalance: Every 63 Days (Quarterly).
    3. Weighting:
       - Calculate Sharpe Ratio for V3 and V9 in the lookback window.
       - If Sharpe > 0: Weight is proportional to Sharpe.
       - If Sharpe < 0: Weight is set to 0.
       - Normalize weights to sum to 1.0.
       
    This allows the portfolio to automatically 'Risk On' into V3 during strong bulls
    and 'Risk Off' into V9 during bears/chop.
    """
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        # Instantiate sub-strategies
        self.strat_v3 = StrategyV3_Macro(ticker, start_date, end_date)
        self.strat_v9 = StrategyV9_RegimeUnshackled(ticker, start_date, end_date)

    def fetch_data(self, warmup_years=2):
        self.strat_v3.fetch_data(warmup_years)
        self.strat_v9.fetch_data(warmup_years)
        
        # Use one of the dataframes as the base
        if self.strat_v3.data is not None and not self.strat_v3.data.empty:
            self.data = self.strat_v3.data.copy()
        elif self.strat_v9.data is not None:
            self.data = self.strat_v9.data.copy()

    def generate_signals(self):
        if self.strat_v3.data is None or self.strat_v9.data is None: return
        
        # 1. Generate Sub-Signals
        self.strat_v3.generate_signals()
        self.strat_v9.generate_signals()
        
        # Merge Data
        df = self.data.copy()
        df['Sig_V3'] = self.strat_v3.data['Signal']
        df['Sig_V9'] = self.strat_v9.data['Signal']
        df.dropna(inplace=True)
        
        # 2. Simulate Sub-Strategy Returns (for metric calculation)
        # We need to know how they *would* have performed to weight them.
        # Lag signals by 1 to avoid lookahead bias when calculating returns.
        df['Ret_V3'] = df['Sig_V3'].shift(1) * df['Returns']
        df['Ret_V9'] = df['Sig_V9'].shift(1) * df['Returns']
        
        # 3. Walk-Forward Weight Optimization
        df['W_V3'] = 0.5 # Default start
        df['W_V9'] = 0.5
        
        lookback = 126      # 6 Months Lookback
        rebalance_freq = 21 # Monthly Rebalance (Faster adaptation)
        
        indices = df.index
        
        if len(df) > lookback:
            for t in range(lookback, len(df), rebalance_freq):
                train_start = indices[t - lookback]
                train_end = indices[t]
                test_end_idx = min(t + rebalance_freq, len(df))
                test_end = indices[test_end_idx - 1]
                
                # Calculate Sharpe in Lookback Window
                # Add small epsilon to std to avoid division by zero
                v3_mean = df.loc[train_start:train_end, 'Ret_V3'].mean()
                v3_std = df.loc[train_start:train_end, 'Ret_V3'].std() + 1e-9
                sharpe_v3 = (v3_mean / v3_std) * np.sqrt(252)
                
                v9_mean = df.loc[train_start:train_end, 'Ret_V9'].mean()
                v9_std = df.loc[train_start:train_end, 'Ret_V9'].std() + 1e-9
                sharpe_v9 = (v9_mean / v9_std) * np.sqrt(252)
                
                # Weighting Logic
                # 1. Filter: If Sharpe is negative, set score to 0
                score_v3 = max(0, sharpe_v3)
                score_v9 = max(0, sharpe_v9)
                
                # 2. Normalize
                total_score = score_v3 + score_v9
                
                if total_score > 0:
                    w_v3 = score_v3 / total_score
                    w_v9 = score_v9 / total_score
                else:
                    # Both are failing? Default to Defensive (V9) or Cash (0)
                    # Let's default to V9 (Safety) as the 'bunker'
                    w_v3 = 0.0
                    w_v9 = 1.0
                
                # Apply weights to NEXT window
                df.loc[train_end:test_end, 'W_V3'] = w_v3
                df.loc[train_end:test_end, 'W_V9'] = w_v9
                
        # 4. Final Signal Generation
        # Blend the signals using the dynamic weights
        df['Signal'] = (df['Sig_V3'] * df['W_V3']) + (df['Sig_V9'] * df['W_V9'])
        
        self.data = df

### V12_Macro

In [56]:
class StrategyV12_Macro_Switch(BaseStrategy):
    """
    V12: The Macro-Guided Ensemble.
    
    Replaces lookback windows with Real-Time Economic Data.
    
    DATA SOURCES (Yahoo Finance):
    1. ^VIX: CBOE Volatility Index.
    2. ^TNX: 10-Year Treasury Yield.
    
    LOGIC:
    1. Calculate 'Macro Stress Score' (0.0 to 1.0).
       - VIX Component: Normalized against recent history. High VIX = High Stress.
       - Yield Component: Rate of Change (ROC) of TNX. Spiking rates = High Stress.
    
    2. Dynamic Weighting:
       - Weight_V3 (Trend) = 1.0 - Stress_Score
       - Weight_V9 (Safety) = Stress_Score
       
    HYPOTHESIS:
    VIX and Rates often spike BEFORE the price crash is fully realized. 
    This allows the model to switch to safety faster than a Moving Average.
    """
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        self.v3 = StrategyV3_Macro(ticker, start_date, end_date)
        self.v9 = StrategyV9_RegimeUnshackled(ticker, start_date, end_date)
        # We store macro data separately
        self.macro_data = None

    def fetch_data(self, warmup_years=2):
        # 1. Fetch Ticker Data
        self.v3.fetch_data(warmup_years)
        self.v9.fetch_data(warmup_years)
        
        if self.v3.data is None or self.v3.data.empty: return
        self.data = self.v3.data.copy()
        
        # 2. Fetch Macro Data (VIX and TNX)
        start_dt = (self.data.index[0] - timedelta(days=365)).strftime("%Y-%m-%d")
        end_dt = self.end_date
        
        try:
            vix = yf.download("^VIX", start=start_dt, end=end_dt, progress=False, auto_adjust=True)
            tnx = yf.download("^TNX", start=start_dt, end=end_dt, progress=False, auto_adjust=True)
            
            # Cleaning
            if isinstance(vix.columns, pd.MultiIndex): vix.columns = vix.columns.get_level_values(0)
            if isinstance(tnx.columns, pd.MultiIndex): tnx.columns = tnx.columns.get_level_values(0)
            
            macro_df = pd.DataFrame(index=self.data.index)
            # Align macro data to the ticker's trading days (ffill for holidays)
            macro_df['VIX'] = vix['Close'].reindex(self.data.index, method='ffill')
            macro_df['TNX'] = tnx['Close'].reindex(self.data.index, method='ffill')
            
            self.macro_data = macro_df
            
        except Exception as e:
            print(f"Macro Data Fetch Error: {e}")
            # Fallback: Zero stress
            self.macro_data = pd.DataFrame({'VIX': 20, 'TNX': 4}, index=self.data.index)

    def generate_signals(self):
        if self.data is None or self.macro_data is None: return
        
        # 1. Run Sub-Strategies
        self.v3.generate_signals()
        self.v9.generate_signals()
        
        # 2. Sync Data
        df = self.data.copy()
        df = df.join(self.v3.data[['Signal']].rename(columns={'Signal':'S_V3'}), how='left')
        df = df.join(self.v9.data[['Signal']].rename(columns={'Signal':'S_V9'}), how='left')
        
        # 3. Calculate Macro Stress Score
        macro = self.macro_data.copy()
        
        # A. VIX Stress (Fear)
        # Normalize VIX: If VIX > 30, Stress = 1.0. If VIX < 15, Stress = 0.0.
        # Uses a rolling Z-score or simple clamp? Simple clamp is more robust to regime shifts.
        macro['VIX_Stress'] = ((macro['VIX'] - 15) / (30 - 15)).clip(0, 1)
        
        # B. Yield Stress (Rate Shock)
        # We care about SPEED of rate rise, not just level.
        # Calculate 20-day Rate of Change of TNX
        macro['TNX_ROC'] = macro['TNX'].pct_change(20)
        # If Yields rise > 10% in a month, that's a shock.
        macro['TNX_Stress'] = (macro['TNX_ROC'] / 0.10).clip(0, 1)
        
        # Combined Stress (Max of either Fear or Rate Shock)
        # We use Max because either one can crash the market independently.
        macro['Total_Stress'] = macro[['VIX_Stress', 'TNX_Stress']].max(axis=1)
        
        # 4. Allocate Weights
        # Smooth the stress signal to avoid daily jitter (3-day avg)
        stress_signal = macro['Total_Stress'].rolling(3).mean().fillna(0)
        
        df['W_V9'] = stress_signal        # High Stress -> More Safety
        df['W_V3'] = 1.0 - stress_signal  # Low Stress -> More Trend
        
        # 5. Final Signal
        df['Signal'] = (df['S_V3'] * df['W_V3']) + (df['S_V9'] * df['W_V9'])
        
        self.data = df

### V20_MacroDom

In [57]:
class StrategyV20_MacroDominance(BaseStrategy):
    """
    V20: The 'Macro Dominance' Strategy.
    
    This is the optimized single-model architecture.
    1. CORE ENGINE: V12 (Macro Switch). Uses VIX/TNX to drive risk on/off.
    2. SECTOR PATCH: Inverts Macro logic for Energy/Commodities (Inflation Trade).
    3. SAFETY VETO: Uses V9's 'Trend Floor' to kill trades if Price < Bollinger Low.
    
    No averaging. No ensembles. Pure Signal.
    """
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        # We use V12 logic natively, but need V9-style volatility calcs for the Floor
        self.macro_data = None

    def fetch_data(self, warmup_years=2):
        super().fetch_data(warmup_years)
        
        # 1. Fetch Macro Data
        try:
            start_dt = self.data.index[0]
            vix = yf.download("^VIX", start=start_dt, end=self.end_date, progress=False, auto_adjust=True)
            tnx = yf.download("^TNX", start=start_dt, end=self.end_date, progress=False, auto_adjust=True)
            
            if isinstance(vix.columns, pd.MultiIndex): vix.columns = vix.columns.get_level_values(0)
            if isinstance(tnx.columns, pd.MultiIndex): tnx.columns = tnx.columns.get_level_values(0)
            
            self.macro_data = pd.DataFrame(index=self.data.index)
            self.macro_data['VIX'] = vix['Close'].reindex(self.data.index).fillna(method='ffill')
            self.macro_data['TNX'] = tnx['Close'].reindex(self.data.index).fillna(method='ffill')
        except:
            print("Macro Fetch Failed. Defaulting to Neutral.")
            self.macro_data = pd.DataFrame({'VIX': 20, 'TNX': 4.0}, index=self.data.index)

    def generate_signals(self):
        if self.data is None or self.macro_data is None: return
        df = self.data.copy()
        macro = self.macro_data.copy()
        
        # --- 1. The V12 Macro Engine ---
        # Stress Score Calculation
        macro['VIX_Stress'] = ((macro['VIX'] - 15) / 15).clip(0, 1)
        macro['TNX_ROC'] = macro['TNX'].pct_change(20) # 1-Month Rate Shock
        macro['TNX_Stress'] = (macro['TNX_ROC'] / 0.10).clip(0, 1)
        
        # Combined Stress (Fear OR Rates)
        macro['Total_Stress'] = macro[['VIX_Stress', 'TNX_Stress']].max(axis=1).rolling(3).mean().fillna(0)
        
        # Base Signal: 1.0 - Stress (Calm = Buy, Panic = Sell)
        df['Signal'] = 1.0 - macro['Total_Stress']
        
        # --- 2. The Sector Patch (XLE Fix) ---
        # If Ticker is Energy, Invert the logic: Stress (Inflation) is GOOD.
        # Hard-coded list for robustness.
        energy_tickers = ["XLE", "XOM", "CVX", "OIH", "USO", "GLD"]
        if self.ticker in energy_tickers:
            # Logic: If Stress is high (1.0), Signal becomes 1.0. 
            # If Stress is low (0.0), Signal stays 1.0 (Energy also works in calm bulls).
            # We essentially IGNORE stress selling, and maybe Boost on stress.
            # Simple Fix: Signal = MAX(Signal, Total_Stress)
            # If Stress is 0.8, Base Signal is 0.2. Max is 0.8. We Buy.
            df['Signal'] = np.maximum(df['Signal'], macro['Total_Stress'])
            
        # --- 3. The V9 Safety Veto (Trend Floor) ---
        # Prevent buying falling knives (BABA fix).
        # Logic: If Price < Lower Bollinger Band, Signal = 0 (Hard Stop).
        df['SMA_20'] = df['Adj Close'].rolling(20).mean()
        df['BB_Lower'] = df['SMA_20'] - 2 * df['SMA_20'].rolling(20).std()
        
        # Allow a tiny buffer (0.98) to avoid noise stop-outs
        is_crashing = df['Adj Close'] < (df['BB_Lower'] * 0.98)
        
        # Override Signal
        df.loc[is_crashing, 'Signal'] = 0.0
        
        # --- 4. Volatility Sizing ---
        target_vol = 0.15 / np.sqrt(252)
        vol_scaler = (target_vol / FeatureLab.yang_zhang_volatility(df)).clip(upper=1.5)
        
        df['Signal'] = df['Signal'] * vol_scaler
        
        self.data = df

### V14_AI

In [58]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

# --- 1. The Transformer Architecture ---
class TimeSeriesTransformer(nn.Module):
    """
    A Transformer Encoder for Time-Series Classification.
    Input: (Batch_Size, Seq_Len, Num_Features)
    Output: (Batch_Size, Num_Classes) -> Probabilities for [-1, 0, 1]
    """
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2, num_classes=3, dropout=0.1):
        super(TimeSeriesTransformer, self).__init__()
        
        # Project features to d_model dimension
        self.embedding = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        
        # Transformer Encoder Layer (The "Brain")
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward=128, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        
        # Final Classification Head
        self.decoder = nn.Linear(d_model, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, src):
        # src shape: [batch, seq_len, features]
        src = self.embedding(src)      # -> [batch, seq_len, d_model]
        src = self.pos_encoder(src)    # Add positional context
        output = self.transformer_encoder(src) # -> [batch, seq_len, d_model]
        
        # We take the output of the *last* time step for classification
        last_step_output = output[:, -1, :] 
        
        prediction = self.decoder(last_step_output)
        return self.softmax(prediction)

class PositionalEncoding(nn.Module):
    """Injects information about the relative or absolute position of the tokens."""
    def __init__(self, d_model, dropout=0.1, max_len=500):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        # x: [batch, seq_len, d_model]
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

# --- 2. The Strategy Wrapper ---
class StrategyV14_Transformer(BaseStrategy):
    """
    V14.1: Continuous Learning Transformer.
    
    CRITICAL CHANGE: 
    The model is initialized ONCE before the loop. 
    It 'remembers' previous regimes and fine-tunes on new data 
    rather than restarting from scratch.
    """
    def generate_signals(self):
        if self.data is None or self.data.empty: return
        df = self.data.copy()
        
        # --- Feature Engineering ---
        # 1. Stationarity & Normalization
        df['Log_Price'] = df['Adj Close'].apply(np.log)
        df['FracDiff'] = FeatureLab.frac_diff_fixed(df['Log_Price'], d=0.4, window=30)
        df['Volatility'] = FeatureLab.yang_zhang_volatility(df)
        df['RSI'] = FeatureLab.compute_rsi(df['Adj Close']) / 100.0 # Normalize RSI to 0-1
        
        # 2. Triple Barrier Targets
        # We widen the barriers slightly to reduce noise for the DL model
        raw_labels = FeatureLab.triple_barrier_labels(
            df['Adj Close'], df['Volatility'], pt=2.0, sl=2.0, barrier_window=10
        )
        # Map: -1(Sell)->0, 0(Neutral)->1, 1(Buy)->2
        label_map = {-1: 0, 0: 1, 1: 2}
        df['Target'] = raw_labels.map(label_map)
        
        df.dropna(inplace=True)
        
        # --- Tensor Prep ---
        SEQ_LEN = 30
        feature_cols = ['FracDiff', 'Volatility', 'RSI']
        
        # Robust Scaling (Median/IQR) to handle outliers better than Standard
        from sklearn.preprocessing import RobustScaler
        scaler = RobustScaler()
        df[feature_cols] = scaler.fit_transform(df[feature_cols])
        
        data_matrix = df[feature_cols].values
        targets = df['Target'].values
        
        X_seq, y_seq, indices = [], [], []
        for i in range(len(data_matrix) - SEQ_LEN):
            X_seq.append(data_matrix[i : i+SEQ_LEN])
            y_seq.append(targets[i + SEQ_LEN - 1])
            indices.append(df.index[i + SEQ_LEN - 1])
            
        X_tensor = torch.FloatTensor(np.array(X_seq))
        y_tensor = torch.LongTensor(np.array(y_seq))
        
        # --- Continuous Walk-Forward Loop ---
        train_window = 500
        test_window = 21
        signals = pd.Series(0, index=df.index)
        
        device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.mps.is_available() else "cpu"))
        print(f"Training V14.1 (Continuous) on {device}...")
        
        # INITIALIZE ONCE (The Fix)
        model = TimeSeriesTransformer(
            input_dim=len(feature_cols), 
            d_model=32, 
            nhead=2, 
            num_layers=1, 
            num_classes=3,
            dropout=0.2 # Higher dropout for regularization
        ).to(device)
        
        # Lower LR for fine-tuning stability
        optimizer = optim.Adam(model.parameters(), lr=0.0001)
        # Weighted Loss to combat class imbalance (Neutral is common)
        # Penalize missing a Bull/Bear move heavily
        class_weights = torch.tensor([2.0, 1.0, 2.0]).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights)
        
        # Rolling Window
        for t in range(train_window, len(X_tensor), test_window):
            # Define Windows
            # Train on trailing window
            X_train = X_tensor[t-train_window : t].to(device)
            y_train = y_tensor[t-train_window : t].to(device)
            
            # Predict next month
            X_test = X_tensor[t : min(t+test_window, len(X_tensor))].to(device)
            if len(X_test) == 0: break
            
            # --- Fine-Tuning Step ---
            model.train()
            # We train for fewer epochs per step since we retain knowledge
            # But we ensure we don't overfit the recent noise
            for _ in range(5): 
                optimizer.zero_grad()
                output = model(X_train)
                loss = criterion(output, y_train)
                loss.backward()
                optimizer.step()
                
            # --- Inference Step ---
            model.eval()
            with torch.no_grad():
                probs = model(X_test)
                
                # Probability Thresholding
                # We need high conviction to trade
                buy_probs = probs[:, 2].cpu().numpy()
                sell_probs = probs[:, 0].cpu().numpy()
                
                batch_signals = np.zeros(len(X_test))
                
                # Dynamic Threshold: Only trade if model is > 50% sure
                batch_signals[buy_probs > 0.5] = 1
                batch_signals[sell_probs > 0.5] = 0 # Cash/Short
                
                # Map to DataFrame
                batch_idx = indices[t : min(t+test_window, len(X_tensor))]
                signals.loc[batch_idx] = batch_signals

        df['Signal'] = signals
        
        # Volatility Sizing (Safety Net)
        target_vol = 0.15 / np.sqrt(252)
        vol_scaler = (target_vol / df['Volatility']).clip(upper=1.5)
        df['Signal'] = df['Signal'] * vol_scaler
        
        self.data = df
        

### V15_MacroTransf

In [59]:
class StrategyV15_MacroTransformer(BaseStrategy):
    """
    V15: The 'Context-Aware' Transformer.
    
    Fusion of V20 (Macro Data) and V14 (Deep Learning).
    Input Features: [FracDiff, Volatility, RSI, VIX_Norm, TNX_ROC]
    """
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        self.macro_data = None

    def fetch_data(self, warmup_years=2):
        super().fetch_data(warmup_years)
        
        # 1. Fetch Macro Data (Same logic as V20)
        try:
            start_dt = self.data.index[0]
            vix = yf.download("^VIX", start=start_dt, end=self.end_date, progress=False, auto_adjust=True)
            tnx = yf.download("^TNX", start=start_dt, end=self.end_date, progress=False, auto_adjust=True)
            
            if isinstance(vix.columns, pd.MultiIndex): vix.columns = vix.columns.get_level_values(0)
            if isinstance(tnx.columns, pd.MultiIndex): tnx.columns = tnx.columns.get_level_values(0)
            
            self.macro_data = pd.DataFrame(index=self.data.index)
            self.macro_data['VIX'] = vix['Close'].reindex(self.data.index).fillna(method='ffill')
            self.macro_data['TNX'] = tnx['Close'].reindex(self.data.index).fillna(method='ffill')
        except:
            print("Macro Fetch Failed. Defaulting.")
            self.macro_data = pd.DataFrame({'VIX': 20, 'TNX': 4.0}, index=self.data.index)

    def generate_signals(self):
        if self.data is None or self.data.empty: return
        df = self.data.copy()
        macro = self.macro_data.copy()
        
        # --- Feature Engineering ---
        # 1. Technicals
        df['Log_Price'] = df['Adj Close'].apply(np.log)
        df['FracDiff'] = FeatureLab.frac_diff_fixed(df['Log_Price'], d=0.4, window=30)
        df['Volatility'] = FeatureLab.yang_zhang_volatility(df)
        df['RSI'] = FeatureLab.compute_rsi(df['Adj Close']) / 100.0
        
        # 2. Macro (The New Eyes)
        # Normalize VIX to 0-1 scale (approx)
        df['VIX_Norm'] = ((macro['VIX'] - 10) / 40).clip(0, 1)
        # Rate of Change for TNX (Speed of rate rise matters more than level)
        df['TNX_ROC'] = macro['TNX'].pct_change(20).fillna(0) * 10 # Scale up for neural net
        
        # 3. Targets (Triple Barrier)
        raw_labels = FeatureLab.triple_barrier_labels(
            df['Adj Close'], df['Volatility'], pt=2.0, sl=2.0, barrier_window=10
        )
        label_map = {-1: 0, 0: 1, 1: 2} # Sell:0, Neut:1, Buy:2
        df['Target'] = raw_labels.map(label_map)
        
        df.dropna(inplace=True)
        
        # --- Tensor Prep ---
        SEQ_LEN = 30
        feature_cols = ['FracDiff', 'Volatility', 'RSI', 'VIX_Norm', 'TNX_ROC']
        
        # Robust Scaler
        from sklearn.preprocessing import RobustScaler
        scaler = RobustScaler()
        df[feature_cols] = scaler.fit_transform(df[feature_cols])
        
        data_matrix = df[feature_cols].values
        targets = df['Target'].values
        
        X_seq, y_seq, indices = [], [], []
        for i in range(len(data_matrix) - SEQ_LEN):
            X_seq.append(data_matrix[i : i+SEQ_LEN])
            y_seq.append(targets[i + SEQ_LEN - 1])
            indices.append(df.index[i + SEQ_LEN - 1])
            
        X_tensor = torch.FloatTensor(np.array(X_seq))
        y_tensor = torch.LongTensor(np.array(y_seq))
        
        # --- Continuous Learning Loop ---
        train_window = 500
        test_window = 21
        signals = pd.Series(0, index=df.index)
        
        device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.mps.is_available() else "cpu"))
        print(f"Training V15 (Macro-Aware) on {device}...")
        
        # Initialize Model (Larger input_dim)
        model = TimeSeriesTransformer(
            input_dim=len(feature_cols), # Now 5 features
            d_model=32, 
            nhead=2, 
            num_layers=1, 
            num_classes=3,
            dropout=0.2
        ).to(device)
        
        optimizer = optim.Adam(model.parameters(), lr=0.0001)
        class_weights = torch.tensor([2.0, 1.0, 2.0]).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights)
        
        for t in range(train_window, len(X_tensor), test_window):
            # Train Step
            X_train = X_tensor[t-train_window : t].to(device)
            y_train = y_tensor[t-train_window : t].to(device)
            
            # Predict Step
            X_test = X_tensor[t : min(t+test_window, len(X_tensor))].to(device)
            if len(X_test) == 0: break
            
            # Fine-Tune
            model.train()
            for _ in range(5): 
                optimizer.zero_grad()
                output = model(X_train)
                loss = criterion(output, y_train)
                loss.backward()
                optimizer.step()
                
            # Inference
            model.eval()
            with torch.no_grad():
                probs = model(X_test)
                buy_probs = probs[:, 2].cpu().numpy()
                sell_probs = probs[:, 0].cpu().numpy()
                
                batch_signals = np.zeros(len(X_test))
                
                # Higher threshold for Macro model to be "Pickier"
                batch_signals[buy_probs > 0.55] = 1
                batch_signals[sell_probs > 0.55] = 0 
                
                batch_idx = indices[t : min(t+test_window, len(X_tensor))]
                signals.loc[batch_idx] = batch_signals

        df['Signal'] = signals
        
        # Volatility Sizing
        target_vol = 0.15 / np.sqrt(252)
        vol_scaler = (target_vol / df['Volatility']).clip(upper=1.5)
        df['Signal'] = df['Signal'] * vol_scaler
        
        self.data = df

## V21_MacroMom

In [60]:
class StrategyV21_MacroMomentum(BaseStrategy):
    """
    V21: The Production Candidate.
    
    Architecture:
    1. BASE: V20 Macro Stress Logic (VIX + TNX).
    2. OVERRIDE: 'Trend Floor'. If the asset is structurally bullish 
       (Price > SMA200), we refuse to go to zero exposure, even if Macro is stressed.
       
    This solves the 'TSLA Problem' (missing rallies) without relying on 
    unstable Neural Networks.
    """
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        self.macro_data = None

    def fetch_data(self, warmup_years=2):
        super().fetch_data(warmup_years)
        
        # 1. Fetch Macro Data
        try:
            start_dt = self.data.index[0]
            vix = yf.download("^VIX", start=start_dt, end=self.end_date, progress=False, auto_adjust=True)
            tnx = yf.download("^TNX", start=start_dt, end=self.end_date, progress=False, auto_adjust=True)
            
            if isinstance(vix.columns, pd.MultiIndex): vix.columns = vix.columns.get_level_values(0)
            if isinstance(tnx.columns, pd.MultiIndex): tnx.columns = tnx.columns.get_level_values(0)
            
            self.macro_data = pd.DataFrame(index=self.data.index)
            self.macro_data['VIX'] = vix['Close'].reindex(self.data.index).fillna(method='ffill')
            self.macro_data['TNX'] = tnx['Close'].reindex(self.data.index).fillna(method='ffill')
        except:
            self.macro_data = pd.DataFrame({'VIX': 20, 'TNX': 4.0}, index=self.data.index)

    def generate_signals(self):
        if self.data is None or self.macro_data is None: return
        df = self.data.copy()
        macro = self.macro_data.copy()
        
        # --- 1. Macro Stress Engine (V20 Logic) ---
        # VIX Stress: Normalizing 12-35 range
        macro['VIX_Stress'] = ((macro['VIX'] - 12) / 23).clip(0, 1)
        
        # Rates Stress: 1-Month Rate of Change > 10% is Panic
        macro['TNX_ROC'] = macro['TNX'].pct_change(20)
        macro['TNX_Stress'] = (macro['TNX_ROC'] / 0.10).clip(0, 1)
        
        # Combined Stress (Smoothed)
        macro['Total_Stress'] = macro[['VIX_Stress', 'TNX_Stress']].max(axis=1).rolling(5).mean().fillna(0)
        
        # Base Signal: Inverse of Stress
        df['Base_Signal'] = 1.0 - macro['Total_Stress']
        
        # --- 2. The Momentum Override (The Fix) ---
        # "Don't fight the tape."
        
        # Long Term Trend (SMA 200)
        df['SMA_200'] = df['Adj Close'].rolling(200).mean()
        df['Trend_Bull'] = (df['Adj Close'] > df['SMA_200'])
        
        # Momentum (RSI)
        df['RSI'] = FeatureLab.compute_rsi(df['Adj Close'])
        df['Mom_Bull'] = (df['RSI'] > 50)
        
        # LOGIC: 
        # If Trend is BULL and Momentum is BULL, Minimum Signal = 0.5.
        # This ensures we participate in "Hated Rallies" (like TSLA 2023)
        # even if VIX is elevated.
        
        df['Signal'] = df['Base_Signal']
        
        # Apply Override
        strong_uptrend = df['Trend_Bull'] & df['Mom_Bull']
        df.loc[strong_uptrend, 'Signal'] = np.maximum(df.loc[strong_uptrend, 'Signal'], 0.5)
        
        # --- 3. Safety Veto (The "Falling Knife" Guard) ---
        # If Price is crashing < Lower Bollinger, KILL the trade.
        df['SMA_20'] = df['Adj Close'].rolling(20).mean()
        df['BB_Lower'] = df['SMA_20'] - 2 * df['SMA_20'].rolling(20).std()
        
        # Allow tiny buffer (0.98)
        is_crashing = df['Adj Close'] < (df['BB_Lower'] * 0.98)
        df.loc[is_crashing, 'Signal'] = 0.0

        # --- 4. Volatility Sizing ---
        target_vol = 0.15 / np.sqrt(252)
        vol_scaler = (target_vol / FeatureLab.yang_zhang_volatility(df)).clip(upper=1.5)
        
        df['Signal'] = df['Signal'] * vol_scaler
        
        self.data = df

## The Sector Rotator

In [61]:
class TSR_V1_KalmanMomentum(BaseStrategy):
    """
    TSR_V1: The Sector Rotator.
    
    Logic:
    1. Alpha: '12-1' Momentum Ranking (intermediate trend, no reversal noise).
    2. Beta: Kalman Filter Dynamic Hedging (Market Neutrality).
    3. Filter: Regime Switching based on Rate Correlation & VIX.
    """
    def __init__(self, ticker, start_date, end_date):
        # Ticker here is just a placeholder/reference (e.g., "SECTOR_ROTATOR")
        super().__init__(ticker, start_date, end_date)
        self.sectors = ['XLE', 'XLF', 'XLK', 'XLY', 'XLI', 'XLV', 'XLP', 'XLU', 'XLB']
        self.sector_data = None
        self.market_data = None

    def fetch_data(self, warmup_years=2):
        # We need a longer warmup for the 12-month momentum calculation
        start_dt = datetime.strptime(self.start_date, "%Y-%m-%d") - timedelta(days=warmup_years*365)
        start_str = start_dt.strftime("%Y-%m-%d")
        
        try:
            # 1. Fetch Sectors
            print(f"Fetching Sector Data for {self.sectors}...")
            sec_df = yf.download(self.sectors, start=start_str, end=self.end_date, progress=False, auto_adjust=True)['Close']
            self.sector_data = sec_df.fillna(method='ffill')
            
            # 2. Fetch Macro (SPY, TLT, ^VIX)
            print("Fetching Macro Data (SPY, TLT, VIX)...")
            macro_tickers = ['SPY', 'TLT', '^VIX']
            macro_df = yf.download(macro_tickers, start=start_str, end=self.end_date, progress=False, auto_adjust=True)['Close']
            self.market_data = macro_df.fillna(method='ffill')
            
            # Create a synthetic 'Close' for the BaseStrategy compatibility (starts at 100)
            self.data = pd.DataFrame(index=sec_df.index)
            self.data['Adj Close'] = 100.0 
            self.data['Returns'] = 0.0
            
        except Exception as e:
            print(f"TSR Fetch Error: {e}")

    def _kalman_beta(self, market_rets, asset_rets, delta=1e-5):
        """
        Estimates time-varying Beta using a Kalman Filter.
        Observation Eq: Asset = Alpha + Beta * Market
        State Eq: Beta follows Random Walk.
        """
        # Efficient numpy implementation
        n = len(market_rets)
        beta = np.zeros(n)
        alpha = np.zeros(n)
        
        # Initial State
        P = np.eye(2) # Covariance
        theta = np.zeros(2) # [Alpha, Beta]
        
        # Transpose matrices for speed
        y = asset_rets.values
        x = market_rets.values
        
        # Loop (Kalman is recursive, hard to vectorize fully without library)
        # We use a simplified RLS-like update for speed in backtest
        # Q (Process Variance) determines adaptivity
        Q = np.eye(2) * delta 
        R = 0.001 # Measurement Variance
        
        for t in range(n):
            if np.isnan(y[t]) or np.isnan(x[t]): continue
            
            # Observation Matrix H = [1, Market_t]
            H = np.array([1.0, x[t]])
            
            # Predict
            # theta_pred = theta (Random Walk)
            P_pred = P + Q
            
            # Update
            error = y[t] - np.dot(H, theta)
            S = np.dot(H, np.dot(P_pred, H.T)) + R
            K = np.dot(P_pred, H.T) / S
            
            theta = theta + K * error
            P = np.eye(2) - np.outer(K, H)
            P = np.dot(P, P_pred)
            
            beta[t] = theta[1]
            alpha[t] = theta[0]
            
        return pd.Series(beta, index=market_rets.index)

    def generate_signals(self):
        if self.sector_data is None or self.market_data is None: return
        
        # --- 1. Momentum Calculation (12-1) ---
        # "12-1" means Return from t-12 months to t-1 month. [cite: 541]
        # Approx 252 trading days. t-1 month is approx 21 days.
        # Formula: P(t-21) / P(t-252) - 1
        mom_12_1 = self.sector_data.shift(21) / self.sector_data.shift(252) - 1.0
        
        # --- 2. Regime Identification ---
        spy = self.market_data['SPY']
        tlt = self.market_data['TLT']
        vix = self.market_data['^VIX']
        
        # Trend Filter (SPY > 200 SMA) [cite: 647]
        spy_trend = (spy > spy.rolling(200).mean())
        
        # Inflation/Correlation Filter [cite: 651]
        # Correlation between Stocks (SPY) and Bonds (TLT)
        corr_spy_tlt = spy.pct_change().rolling(60).corr(tlt.pct_change())
        
        # VIX Stress [cite: 658]
        vix_stress = (vix > 40)
        
        # --- 3. Portfolio Construction Loop ---
        # We simulate the daily holdings
        
        portfolio_curve = [100.0]
        cash = 100.0
        
        # Align data
        rets = self.sector_data.pct_change().fillna(0)
        spy_rets = spy.pct_change().fillna(0)
        
        # Pre-calculate Betas for all sectors (Expensive but necessary)
        print("Calculating Kalman Betas for all sectors...")
        sector_betas = pd.DataFrame(index=rets.index, columns=rets.columns)
        for col in rets.columns:
            sector_betas[col] = self._kalman_beta(spy_rets, rets[col])
            
        # Rebalance Monthly (every 21 days) or Daily? 
        # Paper implies Monthly ranking, but daily hedging. We'll rank daily for smoothness.
        
        # Logic vectors
        long_mask = pd.DataFrame(False, index=mom_12_1.index, columns=mom_12_1.columns)
        short_mask = pd.DataFrame(False, index=mom_12_1.index, columns=mom_12_1.columns)
        
        # Ranking
        ranks = mom_12_1.rank(axis=1, ascending=False)
        # Top 3 Long, Bottom 3 Short
        for t in range(len(ranks)):
            # Determine Regime
            date = ranks.index[t]
            is_bull = spy_trend.iloc[t]
            corr = corr_spy_tlt.iloc[t]
            is_panic = vix_stress.iloc[t]
            
            # DEFAULT: Long Top 3, Short Bottom 3
            # REGIME OVERRIDES [cite: 660]
            
            if is_panic:
                # Crash: Cash
                pass 
            elif not is_bull:
                # Bear Market
                if corr > 0.2:
                    # Inflation Shock (Stocks & Bonds down): Cash
                    pass
                elif corr < -0.2:
                    # Growth Shock (Bonds hedge Stocks): Long Defensives (XLU, XLP)
                    # We override the ranking to force XLU/XLP
                    # (Simplified: just hold XLU/XLP equal weight)
                    # We handle this in the returns loop below
                    pass
                else:
                    # Normal Bear: Market Neutral (Long Winners / Short Losers)
                    pass
            else:
                # Bull Market: Standard Momentum
                pass
                
        # --- Fast Vectorized Backtest ---
        # Selecting Top 3 and Bottom 3
        top_3 = (ranks <= 3)
        bot_3 = (ranks >= 7) # 9 sectors total, 7,8,9 are bottom
        
        # 1. Calculate Basket Returns
        long_basket_ret = (rets * top_3).sum(axis=1) / 3
        short_basket_ret = (rets * bot_3).sum(axis=1) / 3
        
        # 2. Calculate Basket Betas
        long_basket_beta = (sector_betas * top_3).sum(axis=1) / 3
        short_basket_beta = (sector_betas * bot_3).sum(axis=1) / 3
        
        # 3. Calculate Hedge Ratio [cite: 612]
        # H = Beta_L / Beta_S
        # Avoid division by zero
        hedge_ratio = (long_basket_beta / short_basket_beta.replace(0, 1)).clip(0, 2)
        
        # 4. Construct Strategy Return Stream
        # Standard: Beta Neutral
        # Weights: w_L = 1 / (1+H), w_S = H / (1+H) [cite: 618]
        w_L = 1 / (1 + hedge_ratio)
        w_S = hedge_ratio / (1 + hedge_ratio)
        
        strat_ret = (w_L * long_basket_ret) - (w_S * short_basket_ret)
        
        # 5. Apply Regime Overrides (Vectorized)
        
        # Condition A: Crash (VIX > 40) -> Return = 0 (Cash) [cite: 658]
        strat_ret[vix_stress] = 0.0
        
        # Condition B: Bear + Inflation (Corr > 0.2) -> Return = 0 (Cash) [cite: 656]
        bear_inflation = (~spy_trend) & (corr_spy_tlt > 0.2)
        strat_ret[bear_inflation] = 0.0
        
        # Condition C: Bear + Growth Shock (Corr < -0.2) -> Long Defensives [cite: 653]
        # Here we switch from L/S to Long Only XLU+XLP
        bear_growth = (~spy_trend) & (corr_spy_tlt < -0.2)
        defensive_ret = (rets['XLU'] + rets['XLP']) / 2
        strat_ret[bear_growth] = defensive_ret[bear_growth]
        
        # Accumulate
        self.data['Returns'] = strat_ret
        self.data['Adj Close'] = (1 + strat_ret).cumprod() * 100.0
        self.data['Signal'] = 1 # Dummy signal to satisfy BaseStrategy

In [62]:
class Strategy_Ensemble_HillClimb(BaseStrategy):
    """
    Fixed Ensemble: Correctly handles internal backtesting to populate 
    performance metrics for Hill Climbing optimization.
    """
    def __init__(self, ticker, start_date, end_date):
        super().__init__(ticker, start_date, end_date)
        self.v21 = StrategyV21_MacroMomentum(ticker, start_date, end_date)
        self.tsr = TSR_V1_KalmanMomentum("SECTOR_ALPHA", start_date, end_date)
        
    def fetch_data(self, warmup_years=2):
        # 1. Fetch and Generate for Sub-Strategies
        self.v21.fetch_data(warmup_years)
        self.v21.generate_signals()
        # CRITICAL: Populate 'Net_Returns' via backtest
        self.v21.run_backtest() 
        
        self.tsr.fetch_data(warmup_years)
        self.tsr.generate_signals()
        # Populates TSR returns (assuming TSR uses Net_Returns in ensemble)
        self.tsr.run_backtest() 

        # 2. Align Data
        self.data = pd.DataFrame(index=self.v21.data.index)
        # Use 'Net_Returns' for the optimization to account for costs
        self.data['Ret_V21'] = self.v21.results['Net_Returns']
        self.data['Ret_TSR'] = self.tsr.results['Net_Returns']
        self.data.dropna(inplace=True)
        
    def generate_signals(self):
        # Parameters for the Hill Climbing search [cite: 301]
        lookback = 126
        rebalance_step = 21 
        
        rets = self.data[['Ret_V21', 'Ret_TSR']].values
        n = len(rets)
        
        # Initial weight (50/50)
        best_w = 0.5
        weights_v21 = np.zeros(n)
        
        for t in range(lookback, n, rebalance_step):
            window = rets[t-lookback : t]
            
            # Hill Climbing: Test current, +5%, -5%
            candidates = [best_w, np.clip(best_w + 0.05, 0, 1), np.clip(best_w - 0.05, 0, 1)]
            
            best_sharpe = -np.inf
            for w in candidates:
                # Combine returns based on candidate weight
                port_ret = w * window[:, 0] + (1 - w) * window[:, 1]
                sharpe = np.mean(port_ret) / (np.std(port_ret) + 1e-9)
                
                if sharpe > best_sharpe:
                    best_sharpe = sharpe
                    best_w = w
            
            # Apply optimized weight to the next month
            end_idx = min(t + rebalance_step, n)
            weights_v21[t:end_idx] = best_w
            
        self.data['W_V21'] = weights_v21
        self.data['Returns'] = (self.data['Ret_V21'] * self.data['W_V21']) + \
                               (self.data['Ret_TSR'] * (1 - self.data['W_V21']))
        
        self.data['Adj Close'] = (1 + self.data['Returns']).cumprod() * 100
        self.data['Signal'] = 1

# Execution

In [63]:
class RobustBenchmark:
    """
    Implements Walk-Forward Analysis and Deflated Sharpe Ratio logic.
    Benchmarks multiple strategies without look-ahead bias[cite: 275].
    """
    def __init__(self, tickers, start_date, end_date):
        self.tickers = tickers
        self.start_date = start_date
        self.end_date = end_date
        self.results = []

    def run(self):
        print(f"{'STRATEGY':<10} | {'TICKER':<6} | {'ANN RET':<7} | {'SHARPE':<6} | {'MAX DD':<7} | {'NOTES'}")
        print("-" * 75)
        
        strategies = {
            "V1_Base": StrategyV1_Baseline,
        }

        for ticker in self.tickers:
            # Capture Buy & Hold first
            bh = StrategyV1_Baseline(ticker, self.start_date, self.end_date)
            bh.fetch_data()
            bh.data['Signal'] = 1 # Force Buy
            bh.run_backtest()
            self._print_row("Buy&Hold", ticker, bh.metrics)
            
            for name, StratClass in strategies.items():
                try:
                    strat = StratClass(ticker, self.start_date, self.end_date)
                    strat.fetch_data(warmup_years=2)
                    strat.generate_signals()
                    strat.run_backtest()
                    
                    self._print_row(name, ticker, strat.metrics)
                    
                    # Store for portfolio level (optional)
                    self.results.append({
                        'Ticker': ticker,
                        'Strategy': name,
                        'Returns': strat.results['Net_Returns']
                    })
                except Exception as e:
                    print(f"Failed {name} {ticker}: {e}")
            print("-" * 75)

    def _print_row(self, name, ticker, metrics):
        if not metrics: return
        ret = metrics['Total Return']
        # Annualize return approx
        ann_ret = (1 + ret) ** (252 / len(metrics.get('Returns', [1]*252))) - 1 if 'Returns' in metrics else ret
        print(f"{name:<10} | {ticker:<6} | {ret:.1%}   | {metrics['Sharpe Ratio']:.2f}   | {metrics['Max Drawdown']:.1%}   |")

In [64]:
if __name__ == "__main__":
    print(f"{'STRATEGY':<12} | {'TICKER':<6} | {'ANN RET':<7} | {'SHARPE':<6} | {'MAX DD':<7} | {'NOTES'}")
    print("-" * 79)
    
    # Helper wrappers for the dictionary
    class Strategy_Ensemble_5050(Strategy_Ensemble):
        def __init__(self, ticker, start, end): super().__init__(ticker, start, end, 0.5, 0.5)

    class Strategy_Ensemble_Growth(Strategy_Ensemble):
        def __init__(self, ticker, start, end): super().__init__(ticker, start, end, 0.7, 0.3)

    strategies = {
        # "V3_Macro": StrategyV3_Macro,
        # "V9_Unshack": StrategyV9_RegimeUnshackled,
        # "Ens_Bal": Strategy_Ensemble_5050,      # Static 50/50
        # "Ens_Grow": Strategy_Ensemble_Growth,   # Static 70/30
        # "Ens_Adapt": Strategy_Ensemble_Adaptive, # Dynamic V10
        # "HRP_Base": Strategy_Ensemble_HRP,
        "V12_Macro": StrategyV12_Macro_Switch,
        "V21_MacroMom": StrategyV21_MacroMomentum,
        "TSR_V1": TSR_V1_KalmanMomentum,             # New Component
        "ENS_HillClimb": Strategy_Ensemble_HillClimb # The Final Product
    }

    # Same Stress Test Basket
    tickers = ["NVDA", "JPM", "TSLA", "BABA", "XLE"]

    bench = RobustBenchmark(
        tickers=tickers, 
        start_date="2022-01-01", 
        end_date="2024-12-30"
    )
    
    # Manual run loop to handle the specific classes
    for ticker in tickers:
        # Buy & Hold
        bh = StrategyV1_Baseline(ticker, bench.start_date, bench.end_date)
        bh.fetch_data()
        bh.data['Signal'] = 1
        bh.run_backtest()
        bench._print_row("Buy&Hold", ticker, bh.metrics)
        
        for name, StratClass in strategies.items():
            try:
                strat = StratClass(ticker, bench.start_date, bench.end_date)
                strat.fetch_data(warmup_years=2)
                strat.generate_signals()
                strat.run_backtest()
                bench._print_row(name, ticker, strat.metrics)
            except Exception as e:
                print(f"Err {name} {ticker}: {e}")
        print("-" * 79)

STRATEGY     | TICKER | ANN RET | SHARPE | MAX DD  | NOTES
-------------------------------------------------------------------------------
Buy&Hold   | NVDA   | 355.4%   | 1.19   | -62.7%   |
V12_Macro  | NVDA   | 159.6%   | 1.57   | -16.7%   |
V21_MacroMom | NVDA   | 95.6%   | 1.60   | -12.0%   |
Fetching Sector Data for ['XLE', 'XLF', 'XLK', 'XLY', 'XLI', 'XLV', 'XLP', 'XLU', 'XLB']...
Fetching Macro Data (SPY, TLT, VIX)...
Calculating Kalman Betas for all sectors...
TSR_V1     | NVDA   | 1.3%   | 0.10   | -11.1%   |
Fetching Sector Data for ['XLE', 'XLF', 'XLK', 'XLY', 'XLI', 'XLV', 'XLP', 'XLU', 'XLB']...
Fetching Macro Data (SPY, TLT, VIX)...
Calculating Kalman Betas for all sectors...
ENS_HillClimb | NVDA   | 28.6%   | 0.94   | -8.4%   |
-------------------------------------------------------------------------------
Buy&Hold   | JPM    | 62.1%   | 0.77   | -37.9%   |
V12_Macro  | JPM    | 93.1%   | 1.25   | -13.7%   |
V21_MacroMom | JPM    | 59.0%   | 1.25   | -12.3%   |
Fetching