In [1]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
from datetime import datetime

In [2]:
# ---------------------------
# Data Retrieval & Preparation
# ---------------------------
def get_data(ticker: str, start_date: str, end_date: str) -> pd.Series:
    """
    Download daily adjusted closing prices for a given ticker between start_date and end_date.
    """
    data = yf.download(ticker, start=start_date, end=end_date)
    # Ensure we have the Adjusted Close column and forward fill any missing data
    # data = data['Adj Close'].ffill()
    return data

def prepare_data(start_date: str = '2010-01-01', end_date: str = None):
    """
    Retrieve data for UPRO, VGSH, and SPY (as a proxy for the S&P500).
    If end_date is not provided, it defaults to yesterday's date.
    """
    if end_date is None:
        end_date = datetime.today().strftime('%Y-%m-%d')
    data_upro = get_data('UPRO', start_date, end_date)
    data_vgsh = get_data('VGSH', start_date, end_date)
    data_spy = get_data('SPY', start_date, end_date)  # S&P500 Benchmark ETF
    return data_upro, data_vgsh, data_spy

def get_rebalance_dates(daily_data: pd.Series) -> pd.DatetimeIndex:
    """
    Determine the rebalancing dates as the first trading day of each month
    based on the daily_data index.
    """
    # df = daily_data.to_frame(name='price')
    df = daily_data.copy()
    df['Month'] = df.index.to_period('M')
    rebalance_dates = df.groupby('Month').head(1).index
    return rebalance_dates



In [3]:
# ---------------------------
# Dynamic Strategy Backtest
# ---------------------------
def dynamic_strategy_backtest(data_upro: pd.Series, data_vgsh: pd.Series, 
                              rebalance_dates: pd.DatetimeIndex,
                              initial_capital: float = 10000,
                              monthly_contribution: float = 100,
                              bucket_mapping_func=None) -> pd.DataFrame:
    """
    Backtest the dynamic strategy:
      - Invest an initial lump sum in UPRO.
      - Each month, invest monthly_contribution dollars on the first trading day by splitting 
        between UPRO and VGSH based on UPRO's percentage drop from its historical ATH.
    
    bucket_mapping_func: Optional function that takes the drop percentage (drop_pct) and
                         returns the target ratio for UPRO (relative to VGSH holdings).
                         If None, a linear mapping is used where:
                           target_ratio = (bucket_index + 1) * 0.1,
                         with bucket_index from 0 to 9.
    """
    # Define the bucket mapping function if not provided
    if bucket_mapping_func is None:
        def bucket_mapping_func(drop_pct: float) -> float:
            # If drop_pct is negative (UPRO above ATH), treat as 0%
            drop_pct = max(0, drop_pct)
            # Determine the bucket index from 0 to 9 (0%-9%: bucket 0, ..., 90%-100%: bucket 9)
            bucket = int((drop_pct * 100) // 10)
            bucket = min(bucket, 9)
            # Linear mapping: bucket n gives target ratio = (n+1)*0.1
            return (bucket + 1) * 0.1

    # Portfolio holdings stored in terms of units and cash values (the "cash" key isn't used since all cash gets invested immediately)
    portfolio = {
        'UPRO_units': 0.0,
        'VGSH_units': 0.0
    }
    
    # Record portfolio values over rebalancing dates
    results = []

    # Initial lump sum: invest fully in UPRO on the first rebalancing date
    first_date = rebalance_dates[0]
    first_price_upro = data_upro.loc[first_date]
    portfolio['UPRO_units'] = initial_capital / first_price_upro

    # Iterate over each rebalancing date
    for current_date in rebalance_dates:
        # Update historical ATH for UPRO up to the current date.
        historical_prices = data_upro.loc[:current_date]
        ath = historical_prices.max()
        current_price_upro = data_upro.loc[current_date]
        current_price_vgsh = data_vgsh.loc[current_date]
        
        # Calculate UPRO's percentage drop from its ATH
        drop_pct = (ath - current_price_upro) / ath
        
        # Determine the target ratio for UPRO based on the drop bucket
        target_ratio = bucket_mapping_func(drop_pct)
        
        # Current portfolio values based on price at current_date
        V_upro = portfolio['UPRO_units'] * current_price_upro
        V_vgsh = portfolio['VGSH_units'] * current_price_vgsh
        
        # Allocate monthly contribution so that UPRO investment after new cash approximates target ratio relative to VGSH.
        # Let x be dollars allocated to UPRO and (C-x) go to VGSH, where C = monthly_contribution.
        # Our goal: (V_upro + x) = target_ratio * (V_vgsh + (C - x))
        # Solve for x:
        C = monthly_contribution
        x = (target_ratio * (V_vgsh + C) - V_upro) / (1 + target_ratio)
        
        # Edge conditions: if x is negative, invest full C in VGSH; if x > C, invest full C in UPRO.
        if x < 0:
            x = 0
        elif x > C:
            x = C
        invest_upro = x
        invest_vgsh = C - x

        # Purchase new units based on the current prices
        portfolio['UPRO_units'] += invest_upro / current_price_upro
        portfolio['VGSH_units'] += invest_vgsh / current_price_vgsh

        # Record portfolio value on this rebalancing date
        total_value = portfolio['UPRO_units'] * current_price_upro + portfolio['VGSH_units'] * current_price_vgsh
        results.append({
            'Date': current_date,
            'UPRO_value': portfolio['UPRO_units'] * current_price_upro,
            'VGSH_value': portfolio['VGSH_units'] * current_price_vgsh,
            'Total': total_value
        })
    
    results_df = pd.DataFrame(results)
    results_df.set_index('Date', inplace=True)
    return results_df

# ---------------------------
# Benchmark Strategies Backtest
# ---------------------------
def benchmark_backtest(data: pd.Series, start_date: str,
                       initial_capital: float = 10000,
                       monthly_contribution: float = 100) -> pd.DataFrame:
    """
    Backtest a benchmark strategy that invests entirely in a single asset.
    The model uses the same funding structure: an initial lump sum followed by monthly contributions.
    """
    rebalance_dates = get_rebalance_dates(data)
    portfolio_units = 0.0
    results = []
    
    for i, current_date in enumerate(rebalance_dates):
        price = data.loc[current_date]
        # On the first rebalancing date, invest the initial lump sum
        if i == 0:
            invest = initial_capital
        else:
            invest = monthly_contribution
        portfolio_units += invest / price
        total_value = portfolio_units * price
        results.append({
            'Date': current_date,
            'Total': total_value
        })
    
    df = pd.DataFrame(results)
    df.set_index('Date', inplace=True)
    return df

# ---------------------------
# Metrics Calculation Module
# ---------------------------
def compute_performance_metrics(portfolio_df: pd.DataFrame) -> dict:
    """
    Calculate performance metrics given a portfolio DataFrame with a 'Total' column.
    Metrics include:
      - Mean return
      - Standard Deviation
      - Skewness
      - Kurtosis
      - CAGR (Compound Annual Growth Rate)
      - Maximum Drawdown
      - Sharpe Ratio (risk-free rate assumed 0)
      - Sortino Ratio
      - Omega Ratio
    """
    df = portfolio_df.copy()
    # Calculate daily returns. For benchmarks with infrequent rebalancing dates, this is an approximation.
    df['Return'] = df['Total'].pct_change().fillna(0)
    
    # Mean and standard deviation of daily returns
    mean_return = df['Return'].mean()
    std_return = df['Return'].std()

    # Annualize assuming 252 trading days per year
    total_periods = len(df)
    years = total_periods / 252
    cagr = (df['Total'].iloc[-1] / df['Total'].iloc[0])**(1/years) - 1
    
    # Maximum Drawdown calculation
    roll_max = df['Total'].cummax()
    drawdown = df['Total'] / roll_max - 1
    max_drawdown = drawdown.min()
    
    # Sharpe Ratio (risk-free rate = 0)
    sharpe_ratio = (mean_return / std_return) * np.sqrt(252) if std_return != 0 else np.nan
    
    # Sortino Ratio: using downside deviation
    downside_std = df[df['Return'] < 0]['Return'].std()
    sortino_ratio = (mean_return / downside_std) * np.sqrt(252) if downside_std != 0 else np.nan
    
    # Skewness and Kurtosis of returns
    skewness = df['Return'].skew()
    kurtosis = df['Return'].kurtosis()
    
    # Omega Ratio (assume target return = 0)
    target = 0
    gains = df['Return'][df['Return'] > target] - target
    losses = target - df['Return'][df['Return'] < target]
    omega_ratio = gains.sum() / losses.sum() if losses.sum() != 0 else np.nan

    metrics = {
        'Mean Return': mean_return,
        'Std Dev': std_return,
        'Skewness': skewness,
        'Kurtosis': kurtosis,
        'CAGR': cagr,
        'Max Drawdown': max_drawdown,
        'Sharpe Ratio': sharpe_ratio,
        'Sortino Ratio': sortino_ratio,
        'Omega Ratio': omega_ratio
    }
    return metrics

# ---------------------------
# Visualization Module
# ---------------------------
def plot_strategy_performance(dynamic_df: pd.DataFrame, spy_df: pd.DataFrame, upro_df: pd.DataFrame) -> None:
    """
    Plot the portfolio performance over time for the three strategies.
    """
    plt.figure(figsize=(12, 7))
    plt.plot(dynamic_df.index, dynamic_df['Total'], label='Dynamic UPRO/VGSH Strategy')
    plt.plot(spy_df.index, spy_df['Total'], label='100% S&P500 Strategy')
    plt.plot(upro_df.index, upro_df['Total'], label='100% UPRO Strategy')
    plt.xlabel('Date')
    plt.ylabel('Portfolio Value ($)')
    plt.title('Strategy Performance Over Time')
    plt.legend()
    plt.grid(True)
    plt.show()

# ---------------------------
# Main function to run the backtest
# ---------------------------
def run_backtest(start_date: str = '2010-01-01', end_date: str = None,
                 initial_capital: float = 10000, monthly_contribution: float = 100):
    """
    Run backtests for the dynamic strategy and the two benchmark strategies.
    The function retrieves data, calculates portfolio performance, computes metrics,
    and plots the performance over time.
    """
    if end_date is None:
        end_date = datetime.today().strftime('%Y-%m-%d')
    
    # Retrieve historical data for UPRO, VGSH, and SPY
    data_upro, data_vgsh, data_spy = prepare_data(start_date, end_date)
    
    # Determine rebalancing dates using UPRO's data (the index is assumed consistent across assets)
    rebalance_dates = get_rebalance_dates(data_upro)
    
    # Run Dynamic Strategy Backtest (custom strategy)
    dynamic_results = dynamic_strategy_backtest(data_upro, data_vgsh, rebalance_dates, 
                                                  initial_capital, monthly_contribution)
    
    # Run Benchmark Backtests:
    spy_results = benchmark_backtest(data_spy, start_date, initial_capital, monthly_contribution)
    upro_results = benchmark_backtest(data_upro, start_date, initial_capital, monthly_contribution)
    
    # Compute performance metrics for each strategy
    metrics_dynamic = compute_performance_metrics(dynamic_results)
    metrics_spy = compute_performance_metrics(spy_results)
    metrics_upro = compute_performance_metrics(upro_results)
    
    # Combine metrics into a single DataFrame for comparison
    metrics_df = pd.DataFrame({
        'Dynamic Strategy': metrics_dynamic,
        '100% S&P500': metrics_spy,
        '100% UPRO': metrics_upro
    })
    
    # Display performance metrics
    print("Performance Metrics:")
    print(metrics_df)
    
    # Plot strategy performance
    plot_strategy_performance(dynamic_results, spy_results, upro_results)
    
    return dynamic_results, spy_results, upro_results, metrics_df


In [4]:
if __name__ == "__main__":
    # You can adjust start_date and end_date as needed.
    run_backtest(start_date='2010-01-01')


YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().