In [49]:
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.preprocessing import StandardScaler
import torch

# Function to download and prepare ETF data
def prepare_etf_data(tickers, start_date, end_date):
    data = {}
    for ticker in tickers:
        from curl_cffi import requests
        session = requests.Session(impersonate="chrome")
        ticker = yf.Ticker('...', session=session)
        # Download historical data
        df = yf.download(ticker, start=start_date, end=end_date)
        
        # Calculate technical indicators
        df['returns'] = df['Adj Close'].pct_change()
        df['log_returns'] = np.log(df['Adj Close'] / df['Adj Close'].shift(1))
        df['volatility_20d'] = df['returns'].rolling(20).std()
        df['ma_50'] = df['Adj Close'].rolling(50).mean()
        df['ma_200'] = df['Adj Close'].rolling(200).mean()
        df['rsi_14'] = calculate_rsi(df['Adj Close'], 14)
        df['atr_14'] = calculate_atr(df, 14)
        # Add more indicators as needed
        
        data[ticker] = df.copy()
    
    return data

# Function to create feature matrix
def create_features(data, lookback=20):
    features = []
    dates = []
    
    for date in sorted(data['SPY'].index)[lookback:]:
        feature_vector = []
        valid_date = True
        
        # Ensure data is available for all tickers at this date
        for ticker in data:
            if date not in data[ticker].index:
                valid_date = False
                break
        
        if not valid_date:
            continue
            
        # Extract features for each ticker
        for ticker in data:
            df = data[ticker]
            idx = df.index.get_loc(date)
            
            # Current position indicators
            feature_vector.extend([
                df['ma_50'][idx] / df['ma_200'][idx] - 1,  # Golden cross indicator
                df['volatility_20d'][idx],
                df['rsi_14'][idx] / 100,  # Normalize RSI
                df['atr_14'][idx] / df['Adj Close'][idx]  # Normalized ATR
            ])
            
            # Recent performance
            feature_vector.append(df['Adj Close'][idx] / df['Adj Close'][idx-20] - 1)
            
        # Add intermarket relationships
        if 'SPY' in data and 'TLT' in data:
            spy_return = data['SPY']['returns'][idx]
            tlt_return = data['TLT']['returns'][idx]
            feature_vector.append(spy_return / (tlt_return + 1e-10))  # Avoid division by zero
        
        # Add VIX if available
        if 'VIX' in data:
            feature_vector.append(data['VIX']['Adj Close'][idx])
            feature_vector.append(data['VIX']['Adj Close'][idx] / data['VIX']['Adj Close'][idx-20])
        
        features.append(feature_vector)
        dates.append(date)
    
    # Convert to numpy arrays
    X = np.array(features)
    dates = np.array(dates)
    
    # Normalize features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    return X_scaled, dates, scaler

# Main data preparation function
def prepare_data_for_training(tickers, start_date, end_date, lookback=20):
    # Download and prepare data
    data = prepare_etf_data(tickers, start_date, end_date)
    
    # Create feature matrix
    X, dates, scaler = create_features(data, lookback)
    
    # Convert to PyTorch tensors
    X_tensor = torch.tensor(X, dtype=torch.float32)
    
    return X_tensor, dates, scaler, data

In [50]:
import torch
import torch.nn as nn
import torch.optim as optim

class FeatureAutoencoder(nn.Module):
    def __init__(self, input_dim, encoding_dim=10):
        super(FeatureAutoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, encoding_dim)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded
    
    def encode(self, x):
        return self.encoder(x)

# Train the autoencoder
def train_autoencoder(X_tensor, epochs=100, batch_size=32, learning_rate=0.001):
    input_dim = X_tensor.shape[1]
    model = FeatureAutoencoder(input_dim)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    dataset = torch.utils.data.TensorDataset(X_tensor, X_tensor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    for epoch in range(epochs):
        total_loss = 0
        for data, _ in dataloader:
            optimizer.zero_grad()
            _, decoded = model(data)
            loss = criterion(decoded, data)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.6f}')
    
    return model

In [51]:
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

def identify_regimes(X_tensor, autoencoder, n_regimes=4):
    # Use the autoencoder to get the encoded representation
    with torch.no_grad():
        encoded_features = autoencoder.encode(X_tensor).numpy()
    
    # Apply K-means clustering
    kmeans = KMeans(n_clusters=n_regimes, random_state=42)
    regime_labels = kmeans.fit_predict(encoded_features)
    
    # Visualize the regimes (optional)
    plt.figure(figsize=(10, 8))
    plt.scatter(encoded_features[:, 0], encoded_features[:, 1], c=regime_labels, cmap='viridis')
    plt.title('Market Regimes Visualization')
    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.colorbar(label='Regime')
    plt.show()
    
    return regime_labels, kmeans

In [52]:
class RegimeClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dims, num_regimes):
        super(RegimeClassifier, self).__init__()
        
        layers = []
        prev_dim = input_dim
        
        # Hidden layers
        for dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, dim))
            layers.append(nn.ReLU())
            layers.append(nn.BatchNorm1d(dim))
            layers.append(nn.Dropout(0.3))
            prev_dim = dim
        
        # Output layer
        layers.append(nn.Linear(prev_dim, num_regimes))
        
        self.model = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.model(x)

# Train the regime classifier
def train_regime_classifier(X_tensor, regime_labels, hidden_dims=[64, 32], num_regimes=4, 
                           epochs=100, batch_size=32, learning_rate=0.001):
    # Convert labels to tensor
    y_tensor = torch.tensor(regime_labels, dtype=torch.long)
    
    # Create model
    input_dim = X_tensor.shape[1]
    model = RegimeClassifier(input_dim, hidden_dims, num_regimes)
    
    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
    
    # Dataset and dataloader
    dataset = torch.utils.data.TensorDataset(X_tensor, y_tensor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        correct = 0
        total = 0
        
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        accuracy = 100 * correct / total
        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}, Accuracy: {accuracy:.2f}%')
    
    return model

In [53]:
def predict_regime(model, X_tensor, softmax=True):
    model.eval()
    with torch.no_grad():
        outputs = model(X_tensor)
        if softmax:
            probabilities = torch.nn.functional.softmax(outputs, dim=1)
            return probabilities
        return outputs

def get_regime_probabilities(X_tensor, model):
    probs = predict_regime(model, X_tensor, softmax=True)
    return probs.numpy()

In [54]:
def get_regime_allocations():
    # Define allocation for each regime
    # Format: {regime_id: {asset: weight}}
    allocations = {
        0: {'SPY': 0.6, 'QQQ': 0.2, 'TLT': 0.2, 'GLD': 0.0},  # Bullish regime
        1: {'SPY': 0.3, 'QQQ': 0.1, 'TLT': 0.4, 'GLD': 0.2},  # Mixed regime
        2: {'SPY': 0.1, 'QQQ': 0.0, 'TLT': 0.7, 'GLD': 0.2},  # Defensive regime
        3: {'SPY': 0.0, 'QQQ': 0.0, 'TLT': 0.5, 'GLD': 0.5}   # Crisis regime
    }
    return allocations

def calculate_portfolio_weights(regime_probs, allocations):
    """
    Calculate portfolio weights based on regime probabilities
    
    Parameters:
    - regime_probs: numpy array of shape (num_regimes,)
    - allocations: dict mapping regime to asset allocations
    
    Returns:
    - dict mapping asset to weight
    """
    # Initialize weights
    all_assets = set()
    for regime in allocations:
        all_assets.update(allocations[regime].keys())
    
    weights = {asset: 0.0 for asset in all_assets}
    
    # Calculate weighted average based on regime probabilities
    for regime in range(len(regime_probs)):
        if regime in allocations:
            prob = regime_probs[regime]
            for asset, allocation in allocations[regime].items():
                weights[asset] += prob * allocation
    
    # Normalize weights to ensure they sum to 1
    total_weight = sum(weights.values())
    if total_weight > 0:
        weights = {asset: weight / total_weight for asset, weight in weights.items()}
    
    return weights

In [55]:
def calculate_position_sizes(weights, account_value, max_risk_pct=0.02):
    """
    Calculate position sizes based on portfolio weights and account value
    
    Parameters:
    - weights: dict mapping asset to weight
    - account_value: total portfolio value
    - max_risk_pct: maximum risk percentage per trade
    
    Returns:
    - dict mapping asset to position size
    """
    position_sizes = {}
    
    for asset, weight in weights.items():
        # Calculate target position value
        target_value = account_value * weight
        
        # Adjust based on risk management
        # (In a real implementation, you would calculate asset-specific risk)
        risk_adjusted_value = min(target_value, account_value * max_risk_pct)
        
        position_sizes[asset] = risk_adjusted_value
    
    return position_sizes

In [56]:
def run_strategy(model, scaler, tickers, start_date, end_date, account_value=100000):
    """
    Run the market regime classification strategy
    
    Parameters:
    - model: trained regime classifier model
    - scaler: fitted StandardScaler
    - tickers: list of tickers to trade
    - start_date, end_date: date range for the strategy
    - account_value: initial account value
    
    Returns:
    - DataFrame with strategy performance
    """
    # Get regime allocations
    allocations = get_regime_allocations()
    
    # Prepare data
    data = prepare_etf_data(tickers, start_date, end_date)
    X, dates, _, _ = create_features(data, lookback=20)
    
    # Scale features
    X_scaled = scaler.transform(X)
    X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
    
    # Get regime probabilities
    regime_probs = get_regime_probabilities(X_tensor, model)
    
    # Initialize performance tracking
    portfolio_values = [account_value]
    current_weights = {ticker: 0.0 for ticker in tickers}
    
    # Run strategy day by day
    for i in range(1, len(dates)):
        # Calculate target weights based on regime probabilities
        target_weights = calculate_portfolio_weights(regime_probs[i-1], allocations)
        
        # Calculate position sizes
        position_sizes = calculate_position_sizes(target_weights, portfolio_values[-1])
        
        # Calculate returns
        daily_return = 0
        for ticker in tickers:
            if ticker in data and dates[i] in data[ticker].index and dates[i-1] in data[ticker].index:
                ticker_return = data[ticker]['Adj Close'][dates[i]] / data[ticker]['Adj Close'][dates[i-1]] - 1
                daily_return += target_weights.get(ticker, 0) * ticker_return
        
        # Update portfolio value
        new_value = portfolio_values[-1] * (1 + daily_return)
        portfolio_values.append(new_value)
        
        # Update current weights
        current_weights = target_weights
    
    # Create performance DataFrame
    performance = pd.DataFrame({
        'Date': dates[1:],
        'Portfolio Value': portfolio_values[1:]
    }).set_index('Date')
    
    return performance

In [57]:
def calculate_performance_metrics(performance):
    """
    Calculate strategy performance metrics
    
    Parameters:
    - performance: DataFrame with 'Portfolio Value' column
    
    Returns:
    - dict of performance metrics
    """
    portfolio_values = performance['Portfolio Value']
    returns = portfolio_values.pct_change().dropna()
    
    # Calculate metrics
    total_return = (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) - 1
    annual_return = (1 + total_return) ** (252 / len(returns)) - 1
    volatility = returns.std() * np.sqrt(252)
    sharpe_ratio = annual_return / volatility if volatility > 0 else 0
    
    # Calculate drawdown
    drawdown = 1 - portfolio_values / portfolio_values.cummax()
    max_drawdown = drawdown.max()
    
    # Create metrics dictionary
    metrics = {
        'Total Return': total_return,
        'Annualized Return': annual_return,
        'Annualized Volatility': volatility,
        'Sharpe Ratio': sharpe_ratio,
        'Max Drawdown': max_drawdown
    }
    
    return metrics

In [58]:
def walk_forward_test(tickers, start_date, end_date, window_size=252, step_size=63):
    """
    Perform walk-forward testing of the strategy
    
    Parameters:
    - tickers: list of tickers to trade
    - start_date, end_date: date range for testing
    - window_size: size of the training window in days
    - step_size: days to move forward after each test
    
    Returns:
    - DataFrame with strategy performance
    """
    # Convert dates to pandas datetime
    start = pd.to_datetime(start_date)
    end = pd.to_datetime(end_date)
    
    # Initialize results storage
    all_results = []
    
    # Create date ranges for walk-forward testing
    current_start = start
    while current_start + pd.Timedelta(days=window_size+step_size) < end:
        train_start = current_start
        train_end = current_start + pd.Timedelta(days=window_size)
        test_start = train_end
        test_end = test_start + pd.Timedelta(days=step_size)
        
        # Prepare training data
        X_train, _, scaler, _ = prepare_data_for_training(tickers, train_start, train_end)
        
        # Train autoencoder
        autoencoder = train_autoencoder(X_train)
        
        # Identify regimes
        regime_labels, _ = identify_regimes(X_train, autoencoder)
        
        # Train regime classifier
        model = train_regime_classifier(X_train, regime_labels)
        
        # Run strategy on test period
        test_performance = run_strategy(model, scaler, tickers, test_start, test_end)
        test_performance['Training Period'] = f"{train_start.date()} to {train_end.date()}"
        
        all_results.append(test_performance)
        
        # Move forward
        current_start = current_start + pd.Timedelta(days=step_size)
    
    # Combine all results
    if all_results:
        combined_results = pd.concat(all_results)
        return combined_results
    else:
        return pd.DataFrame()

In [59]:
def save_model(model, autoencoder, scaler, kmeans, path):
    """
    Save all model components
    
    Parameters:
    - model: trained regime classifier
    - autoencoder: trained feature autoencoder
    - scaler: fitted StandardScaler
    - kmeans: fitted KMeans clusterer
    - path: directory to save models
    """
    import os
    import pickle
    
    # Create directory if it doesn't exist
    os.makedirs(path, exist_ok=True)
    
    # Save PyTorch models
    torch.save(model.state_dict(), os.path.join(path, 'regime_classifier.pth'))
    torch.save(autoencoder.state_dict(), os.path.join(path, 'autoencoder.pth'))
    
    # Save sklearn models
    with open(os.path.join(path, 'scaler.pkl'), 'wb') as f:
        pickle.dump(scaler, f)
    
    with open(os.path.join(path, 'kmeans.pkl'), 'wb') as f:
        pickle.dump(kmeans, f)

def load_model(path, input_dim, hidden_dims, num_regimes, encoding_dim=10):
    """
    Load all model components
    
    Parameters:
    - path: directory containing saved models
    - input_dim: input dimension for models
    - hidden_dims: hidden dimensions for classifier
    - num_regimes: number of regimes
    - encoding_dim: dimension of encoded features
    
    Returns:
    - model, autoencoder, scaler, kmeans
    """
    import os
    import pickle
    
    # Initialize models
    model = RegimeClassifier(input_dim, hidden_dims, num_regimes)
    autoencoder = FeatureAutoencoder(input_dim, encoding_dim)
    
    # Load PyTorch models
    model.load_state_dict(torch.load(os.path.join(path, 'regime_classifier.pth')))
    autoencoder.load_state_dict(torch.load(os.path.join(path, 'autoencoder.pth')))
    
    # Load sklearn models
    with open(os.path.join(path, 'scaler.pkl'), 'rb') as f:
        scaler = pickle.load(f)
    
    with open(os.path.join(path, 'kmeans.pkl'), 'rb') as f:
        kmeans = pickle.load(f)
    
    return model, autoencoder, scaler, kmeans

In [60]:
def daily_trading_routine(model, scaler, tickers, account_value):
    """
    Daily routine for trading based on market regime
    
    Parameters:
    - model: trained regime classifier
    - scaler: fitted StandardScaler
    - tickers: list of tickers to trade
    - account_value: current account value
    
    Returns:
    - dict of trading actions
    """
    # Get latest data
    latest_data = {}
    for ticker in tickers:
        latest_data[ticker] = yf.download(ticker, period='60d')
    
    # Prepare features
    X, _, _ = create_features(latest_data)
    
    # Use only the most recent data point
    latest_features = X[-1].reshape(1, -1)
    
    # Scale features
    scaled_features = scaler.transform(latest_features)
    features_tensor = torch.tensor(scaled_features, dtype=torch.float32)
    
    # Get regime probabilities
    regime_probs = get_regime_probabilities(features_tensor, model)[0]
    
    # Get allocations for each regime
    allocations = get_regime_allocations()
    
    # Calculate target weights
    target_weights = calculate_portfolio_weights(regime_probs, allocations)
    
    # Calculate position sizes
    position_sizes = calculate_position_sizes(target_weights, account_value)
    
    # Generate trading actions
    actions = {ticker: position_sizes.get(ticker, 0) for ticker in tickers}
    
    return {
        'regime_probabilities': {i: float(p) for i, p in enumerate(regime_probs)},
        'target_weights': target_weights,
        'trading_actions': actions
    }

In [61]:
# Install vectorbt if not already installed
# !pip install vectorbt

import vectorbt as vbt
import numpy as np
import pandas as pd
import torch
from datetime import datetime, timedelta
import yfinance as yf

In [62]:
def prepare_vectorbt_backtest(tickers, start_date, end_date):
    """
    Prepare data and models for VectorBT backtesting
    
    Parameters:
    - tickers: list of ETF tickers to include in the strategy
    - start_date, end_date: backtest period
    
    Returns:
    - Dictionary with all prepared components for backtesting
    """
    # Define training period (before backtest period)
    train_start = pd.to_datetime(start_date) - timedelta(days=365)
    train_end = pd.to_datetime(start_date) - timedelta(days=1)
    
    # Download data for training
    print("Downloading training data...")
    training_data = prepare_etf_data(tickers, train_start.strftime('%Y-%m-%d'), 
                                    train_end.strftime('%Y-%m-%d'))
    
    # Prepare features and train models
    print("Preparing features and training models...")
    X_train, _, scaler, _ = prepare_data_for_training(tickers, 
                                                    train_start.strftime('%Y-%m-%d'), 
                                                    train_end.strftime('%Y-%m-%d'))
    
    # Train autoencoder
    print("Training autoencoder...")
    autoencoder = train_autoencoder(X_train)
    
    # Identify regimes
    print("Identifying market regimes...")
    regime_labels, kmeans = identify_regimes(X_train, autoencoder)
    
    # Train regime classifier
    print("Training regime classifier...")
    model = train_regime_classifier(X_train, regime_labels)
    
    # Download data for backtesting period
    print("Downloading backtest data...")
    prices = {}
    for ticker in tickers:
        df = yf.download(ticker, start=start_date, end=end_date)
        prices[ticker] = df['Adj Close']
    
    prices_df = pd.DataFrame(prices)
    
    # Create regime allocations dictionary
    regime_allocations = get_regime_allocations()
    
    return {
        'model': model,
        'autoencoder': autoencoder,
        'scaler': scaler,
        'kmeans': kmeans,
        'prices_df': prices_df,
        'regime_allocations': regime_allocations,
        'tickers': tickers
    }

In [63]:
def generate_regime_signals(backtest_data, window_size=20):
    """
    Generate regime signals throughout the backtest period
    
    Parameters:
    - backtest_data: Dictionary from prepare_vectorbt_backtest
    - window_size: Lookback window for feature creation
    
    Returns:
    - DataFrame with regime probabilities for each date
    """
    model = backtest_data['model']
    scaler = backtest_data['scaler']
    prices_df = backtest_data['prices_df']
    tickers = backtest_data['tickers']
    
    # Create a dictionary to store all data
    data = {}
    for ticker in tickers:
        # Create dataframe with technical indicators
        df = pd.DataFrame()
        df['Adj Close'] = prices_df[ticker]
        df['returns'] = df['Adj Close'].pct_change()
        df['log_returns'] = np.log(df['Adj Close'] / df['Adj Close'].shift(1))
        df['volatility_20d'] = df['returns'].rolling(20).std()
        df['ma_50'] = df['Adj Close'].rolling(50).mean()
        df['ma_200'] = df['Adj Close'].rolling(200).mean()
        df['rsi_14'] = calculate_rsi(df['Adj Close'], 14)
        df['atr_14'] = calculate_atr(df, 14)
        data[ticker] = df.copy()
    
    # Create features matrix
    X, dates, _ = create_features(data, lookback=window_size)
    
    # Scale features
    X_scaled = scaler.transform(X)
    X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
    
    # Get regime probabilities
    model.eval()
    with torch.no_grad():
        outputs = model(X_tensor)
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
    
    # Convert to numpy
    regime_probs = probabilities.numpy()
    
    # Create DataFrame with results
    regime_df = pd.DataFrame(
        regime_probs, 
        index=dates,
        columns=[f'Regime_{i}' for i in range(regime_probs.shape[1])]
    )
    
    return regime_df

In [64]:
def run_vectorbt_backtest(backtest_data, regime_signals):
    """
    Run vectorbt backtest using regime signals
    
    Parameters:
    - backtest_data: Dictionary from prepare_vectorbt_backtest
    - regime_signals: DataFrame with regime probabilities
    
    Returns:
    - VectorBT Portfolio object with backtest results
    """
    prices_df = backtest_data['prices_df']
    regime_allocations = backtest_data['regime_allocations']
    
    # Initialize weights DataFrame with same index as prices
    weights = pd.DataFrame(0, index=prices_df.index, columns=prices_df.columns)
    
    # For each date with regime signals, calculate portfolio weights
    for date in regime_signals.index:
        if date in weights.index:
            # Get regime probabilities for this date
            regime_probs = regime_signals.loc[date].values
            
            # Calculate weights for each asset
            target_weights = calculate_portfolio_weights(regime_probs, regime_allocations)
            
            # Update weights DataFrame
            for ticker, weight in target_weights.items():
                if ticker in weights.columns:
                    weights.loc[date, ticker] = weight
    
    # Forward fill weights to handle missing dates
    weights = weights.fillna(method='ffill')
    
    # Run VectorBT portfolio backtest
    portfolio = vbt.Portfolio.from_orders(
        prices_df,
        weights,
        freq='1D',
        init_cash=100000,
        fees=0.001,  # 0.1% trading fee
        slippage=0.001,  # 0.1% slippage
        log=True
    )
    
    return portfolio

In [65]:
def analyze_backtest_results(portfolio, benchmark_ticker='SPY'):
    """
    Analyze backtest results, including comparison to benchmark
    
    Parameters:
    - portfolio: VectorBT Portfolio object
    - benchmark_ticker: Ticker to use as benchmark
    
    Returns:
    - Dictionary of performance metrics
    """
    # Get performance metrics
    returns = portfolio.returns()
    total_return = portfolio.total_return()
    sharpe_ratio = portfolio.sharpe_ratio()
    max_drawdown = portfolio.max_drawdown()
    
    # Get benchmark data
    benchmark_data = portfolio.prices[benchmark_ticker]
    benchmark_returns = benchmark_data.pct_change()
    benchmark_total_return = (benchmark_data.iloc[-1] / benchmark_data.iloc[0]) - 1
    
    # Calculate beta to benchmark
    cov = np.cov(returns.fillna(0), benchmark_returns.fillna(0))[0, 1]
    var = np.var(benchmark_returns.fillna(0))
    beta = cov / var if var != 0 else np.nan
    
    # Calculate alpha (annualized)
    rf_rate = 0.03  # Assume 3% risk-free rate
    trading_days = 252
    annualized_return = (1 + total_return) ** (trading_days / len(returns)) - 1
    annualized_benchmark_return = (1 + benchmark_total_return) ** (trading_days / len(benchmark_returns)) - 1
    alpha = annualized_return - (rf_rate + beta * (annualized_benchmark_return - rf_rate))
    
    # Create metrics dictionary
    metrics = {
        'Total Return': total_return,
        'Annualized Return': annualized_return,
        'Sharpe Ratio': sharpe_ratio,
        'Max Drawdown': max_drawdown,
        'Beta': beta,
        'Alpha': alpha,
        'Benchmark Return': benchmark_total_return,
        'Benchmark Annualized Return': annualized_benchmark_return
    }
    
    return metrics

In [66]:
def visualize_backtest_results(portfolio, regime_signals):
    """
    Create visualizations of backtest results including regime transitions
    
    Parameters:
    - portfolio: VectorBT Portfolio object
    - regime_signals: DataFrame with regime probabilities
    """
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    from matplotlib.colors import LinearSegmentedColormap
    
    # Create figure with multiple subplots
    fig, axes = plt.subplots(3, 1, figsize=(12, 18), gridspec_kw={'height_ratios': [2, 1, 1]})
    
    # 1. Portfolio value over time with regime background
    ax1 = axes[0]
    
    # Plot portfolio value
    portfolio.plot_value(ax=ax1)
    
    # Color backdrop based on dominant regime (could be enhanced with actual colors per regime)
    dates = regime_signals.index
    for i in range(len(dates)-1):
        if i < len(dates) - 1:
            # Determine dominant regime
            probs = regime_signals.iloc[i].values
            dominant_regime = np.argmax(probs)
            
            # Determine color based on regime
            colors = ['green', 'blue', 'orange', 'red']  # Adjust colors based on regime meaning
            color = colors[dominant_regime] if dominant_regime < len(colors) else 'gray'
            
            # Add colored background
            start_date = mdates.date2num(dates[i])
            end_date = mdates.date2num(dates[i+1])
            ax1.axvspan(start_date, end_date, alpha=0.1, color=color)
    
    ax1.set_title('Portfolio Value with Market Regimes')
    
    # 2. Asset allocation over time
    ax2 = axes[1]
    portfolio.plot_assets(ax=ax2)
    ax2.set_title('Asset Allocation Over Time')
    
    # 3. Regime probabilities over time
    ax3 = axes[2]
    
    # Create a colormap for each regime
    n_regimes = regime_signals.shape[1]
    colors = plt.cm.viridis(np.linspace(0, 1, n_regimes))
    
    # Stack plot of regime probabilities
    ax3.stackplot(regime_signals.index, 
                 [regime_signals[f'Regime_{i}'] for i in range(n_regimes)],
                 labels=[f'Regime {i}' for i in range(n_regimes)],
                 colors=colors,
                 alpha=0.7)
    
    ax3.set_title('Market Regime Probabilities')
    ax3.legend(loc='upper left')
    ax3.set_ylim(0, 1)
    
    plt.tight_layout()
    plt.show()
    
    # Create additional performance visualizations
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # Drawdown plot
    portfolio.plot_drawdown(ax=axes[0, 0])
    axes[0, 0].set_title('Portfolio Drawdown')
    
    # Monthly returns heatmap
    portfolio.plot_monthly_returns(ax=axes[0, 1])
    axes[0, 1].set_title('Monthly Returns')
    
    # Underwater plot
    portfolio.plot_underwater(ax=axes[1, 0])
    axes[1, 0].set_title('Underwater Plot')
    
    # Rolling Sharpe ratio
    rolling_window = min(252, len(portfolio.returns()) // 2)  # Use 1 year window if possible
    portfolio.rolling_sharpe(window=rolling_window).plot(ax=axes[1, 1])
    axes[1, 1].set_title(f'Rolling Sharpe Ratio ({rolling_window}-day)')
    
    plt.tight_layout()
    plt.show()

In [67]:
def backtest_regime_strategy(tickers=['SPY', 'QQQ', 'TLT', 'GLD'], 
                           start_date='2018-01-01', 
                           end_date='2023-12-31'):
    """
    Run complete backtest of the market regime classification strategy
    
    Parameters:
    - tickers: List of ETF tickers to include
    - start_date, end_date: Backtest period
    
    Returns:
    - Dictionary with backtest results
    """
    print("Starting backtest preparation...")
    
    # Prepare backtest components
    backtest_data = prepare_vectorbt_backtest(tickers, start_date, end_date)
    
    print("Generating regime signals...")
    
    # Generate regime signals
    regime_signals = generate_regime_signals(backtest_data)
    
    print("Running vectorbt backtest...")
    
    # Run backtest
    portfolio = run_vectorbt_backtest(backtest_data, regime_signals)
    
    print("Analyzing results...")
    
    # Analyze results
    metrics = analyze_backtest_results(portfolio)
    
    print("Visualizing results...")
    
    # Visualize results
    visualize_backtest_results(portfolio, regime_signals)
    
    # Detailed statistics
    stats = portfolio.stats()
    
    return {
        'portfolio': portfolio,
        'metrics': metrics,
        'stats': stats,
        'regime_signals': regime_signals
    }

In [71]:
# Run backtest
backtest_results = backtest_regime_strategy(
    tickers=['SPY', 'QQQ', 'TLT', 'GLD', 'VNQ'],  # Added real estate ETF
    start_date='2019-01-01',
    end_date='2023-12-31'
)

# Print key metrics
metrics = backtest_results['metrics']
print(f"Total Return: {metrics['Total Return']:.2%}")
print(f"Annualized Return: {metrics['Annualized Return']:.2%}")
print(f"Sharpe Ratio: {metrics['Sharpe Ratio']:.2f}")
print(f"Max Drawdown: {metrics['Max Drawdown']:.2%}")
print(f"Alpha: {metrics['Alpha']:.2%}")
print(f"Beta: {metrics['Beta']:.2f}")
print(f"Benchmark Return: {metrics['Benchmark Return']:.2%}")

# Display detailed statistics
display(backtest_results['stats'])

# Access portfolio object for further analysis
portfolio = backtest_results['portfolio']

Starting backtest preparation...
Downloading training data...


AttributeError: 'Ticker' object has no attribute 'replace'

In [44]:
def analyze_regime_characteristics(regime_signals, prices_df):
    """
    Analyze the characteristics of each identified regime
    
    Parameters:
    - regime_signals: DataFrame with regime probabilities
    - prices_df: DataFrame with asset prices
    
    Returns:
    - DataFrame with regime characteristics
    """
    # Create returns dataframe
    returns_df = prices_df.pct_change().dropna()
    
    # Determine dominant regime for each date
    dominant_regimes = pd.DataFrame(
        np.argmax(regime_signals.values, axis=1),
        index=regime_signals.index,
        columns=['Dominant_Regime']
    )
    
    # Merge with returns
    combined_df = pd.merge(
        dominant_regimes, 
        returns_df, 
        left_index=True, 
        right_index=True,
        how='inner'
    )
    
    # Analyze characteristics by regime
    regime_stats = {}
    
    for regime in combined_df['Dominant_Regime'].unique():
        # Filter data for this regime
        regime_data = combined_df[combined_df['Dominant_Regime'] == regime]
        
        # Calculate metrics for each asset
        asset_stats = {}
        for asset in returns_df.columns:
            returns = regime_data[asset]
            
            asset_stats[asset] = {
                'Mean Return': returns.mean(),
                'Volatility': returns.std(),
                'Sharpe': returns.mean() / returns.std() if returns.std() > 0 else 0,
                'Max Drawdown': (1 - (1 + returns).cumprod() / (1 + returns).cumprod().cummax()).max(),
                'Win Rate': (returns > 0).mean(),
                'Observations': len(returns)
            }
        
        regime_stats[f'Regime_{regime}'] = asset_stats
    
    # Create summary DataFrame
    summary_rows = []
    
    for regime, assets in regime_stats.items():
        for asset, metrics in assets.items():
            summary_rows.append({
                'Regime': regime,
                'Asset': asset,
                **metrics
            })
    
    summary_df = pd.DataFrame(summary_rows)
    
    return summary_df

# Run regime analysis
regime_analysis = analyze_regime_characteristics(
    backtest_results['regime_signals'],
    backtest_results['portfolio'].prices
)

# Display regime analysis
display(regime_analysis.pivot(index='Regime', columns='Asset'))

# Visualize regime-specific returns
plt.figure(figsize=(14, 8))
sns.barplot(x='Regime', y='Mean Return', hue='Asset', data=regime_analysis)
plt.title('Average Daily Returns by Regime and Asset')
plt.xlabel('Market Regime')
plt.ylabel('Mean Daily Return')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

NameError: name 'backtest_results' is not defined