# Imports

In [None]:
import os
from dotenv import load_dotenv

import pandas as pd
import numpy as np

import plotly.express as px

from catboost import CatBoostClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, confusion_matrix, classification_report

from datetime import datetime, timedelta
from pandas_market_calendars import get_calendar
import yfinance as yf

from tqdm.notebook import tqdm

from databuilder import build_spread_backtest_dataset


# Fetch base strategy backtest data + add relevant info

In [None]:
load_dotenv(dotenv_path='/Users/teymour/Desktop/qnt-projs/Short-Vol-ML/images/.env')  # replace with your path
polygon_api_key = os.getenv("POLYGON_API_KEY")

calendar = get_calendar("NYSE")
trading_dates = calendar.schedule(start_date="2023-04-20", end_date=datetime.today()).index.strftime("%Y-%m-%d").values

#  Call the function from databuilder.py to generate a DataFrame with all relevant info for base strategy backtesting
base_backtest_df = build_spread_backtest_dataset(dates=trading_dates, ticker='I:SPX', index_ticker="I:VIX1D", 
                                              options_ticker="SPX", trade_time="09:35", move_adjustment=0.5, spread_width=1, polygon_api_key=polygon_api_key)

In [None]:
# trading assumptions and max loss calculations
base_backtest_df['nat_price_cost'] = base_backtest_df['short_bid_price'] - base_backtest_df['long_ask_price']
base_backtest_df['max_nat_price_loss'] = abs(base_backtest_df['short_strike'].iloc[0] - base_backtest_df['long_strike'].iloc[0]) - base_backtest_df['nat_price_cost']
base_backtest_df['mid_price_cost'] = base_backtest_df['short_mid_price'] - base_backtest_df['long_mid_price']
base_backtest_df['max_mid_price_loss'] = abs(base_backtest_df['short_strike'].iloc[0] - base_backtest_df['long_strike'].iloc[0]) - base_backtest_df['mid_price_cost']
base_backtest_df["contracts"] = 1
base_backtest_df["fees"] = base_backtest_df["contracts"] * 0.04

In [None]:
# implied and realized volatility metrics
base_backtest_df['trade_to_close_vol'] = abs((base_backtest_df['underlying_price_at_trade'] - base_backtest_df['underlying_closing_price']) / base_backtest_df['underlying_price_at_trade']) * 100
base_backtest_df['current_day_IV'] = base_backtest_df['vix1d_value'] / np.sqrt(252)
base_backtest_df['current_day_VRP'] = base_backtest_df['current_day_IV'] - base_backtest_df['trade_to_close_vol']

# Backtest

In [None]:
def calculate_pnl(row):
    """
    Calculate the profit and loss (PnL) for the given row of backtest data.

    Parameters
    ----------
    row : pd.Series
        A row of data containing information about the trade taken on a 
        given day

    Returns
    -------
    float
        The gross profit or loss (PnL) for the trade. If the calculated final PnL exceeds the maximum 
        allowable loss, it caps the loss at 'max_mid_price_loss'.
    """
    if row['direction'] == 1:
        settlement = row['underlying_closing_price'] - row['short_strike']
        if settlement > 0:
            settlement = 0
            final_pnl = row['mid_price_cost']
        else:
            final_pnl = settlement + row['mid_price_cost']
            
    elif row['direction'] == 0:
        settlement = row['short_strike'] - row['underlying_closing_price']
        if settlement > 0:
            settlement = 0
            final_pnl = row['mid_price_cost']
        else:
            final_pnl = settlement + row['mid_price_cost']

    gross_pnl = np.maximum(final_pnl, row['max_mid_price_loss'] * -1)
    
    return gross_pnl

In [69]:
base_backtest_df['gross_pnl'] = base_backtest_df.apply(calculate_pnl, axis=1)
base_backtest_df['net_pnl'] = (base_backtest_df['gross_pnl'] * base_backtest_df['contracts']) - base_backtest_df['fees']

# Assume we start with $3000
capital = 3000

base_backtest_df['net_capital'] = capital + (base_backtest_df['net_pnl']*100).cumsum()
base_backtest_df['day_begin_net_capital'] = base_backtest_df['net_capital'] - (base_backtest_df['net_pnl']*100)
base_backtest_df['pct_return'] = (base_backtest_df['net_pnl']* 100)/ base_backtest_df['day_begin_net_capital']


In [None]:
def sharpe_ratio(returns, annualize=True, periods_per_year=252, risk_free_rate=0.0):
    """
    Calculate the Sharpe ratio for a series of returns.

    Parameters
    ----------
    returns : pd.Series
        A Pandas Series containing the returns, typically in percentage terms.
    annualize : bool, optional (default=True)
        Whether to annualize the Sharpe ratio.
    periods_per_year : int, optional (default=252)
        The number of trading periods in a year. Defaults to 252 (daily returns).
    risk_free_rate : float, optional (default=0.0)
        The risk-free rate of return. Defaults to 0 for simplicity.

    Returns
    -------
    float
        The calculated Sharpe ratio. If annualized, returns the annualized Sharpe ratio.
    """
    excess_returns = returns - risk_free_rate / periods_per_year

    mean_return = excess_returns.mean()
    std_return = excess_returns.std()

    sharpe_ratio = mean_return / std_return

    if annualize:
        sharpe_ratio *= np.sqrt(periods_per_year)
    
    return sharpe_ratio

In [70]:
px.line(base_backtest_df['net_capital']).show()
print(f"Base strategy Sharpe: {sharpe_ratio(returns=base_backtest_df['pct_return'])}")

base_strat_win_rate = len(base_backtest_df[base_backtest_df['net_pnl'] > 0]) / len(base_backtest_df)
base_strat_avg_win = base_backtest_df[base_backtest_df['net_pnl'] > 0]['net_pnl'].mean() * 100
base_strat_avg_loss = abs(base_backtest_df[base_backtest_df['net_pnl'] < 0]['net_pnl'].mean() * 100)

print(f"Base strategy win rate: {round(base_strat_win_rate * 100, 2)}%")
print(f"Base strategy average win: ${round(base_strat_avg_win, 2)}")
print(f"Base strategy average loss: ${round(base_strat_avg_loss, 2)}")
print(f"Base strategy expected value per trade: ${round((base_strat_avg_win * base_strat_win_rate) - (base_strat_avg_loss * (1 - base_strat_win_rate)), 2)}")


Base strategy Sharpe: 1.1146481708865252
Base strategy win rate: 78.72%
Base strategy average win: $108.68
Base strategy average loss: $351.54
Base strategy expected value per trade: $10.76


# Meta-Labeling

In [None]:
# Download historical OHLCV data for the S&P 500 to create features
sp500_ticker = "^GSPC"
underlying_feature_df = yf.download(sp500_ticker, start="2022-01-01", end="2024-10-25", interval="1d")

# preprocess the DataFrame
underlying_feature_df.columns = underlying_feature_df.columns.get_level_values(0)
underlying_feature_df.index = pd.to_datetime(underlying_feature_df.index).date
underlying_feature_df = underlying_feature_df.rename_axis("t", axis="index")
underlying_feature_df = underlying_feature_df.drop(columns=['Close'])
underlying_feature_df.columns.name = None
underlying_feature_df = underlying_feature_df.rename(columns={
    'Open': 'o',
    'High': 'h',
    'Low': 'l',
    'Adj Close': 'c',
    'Volume': 'v'
})

In [None]:
underlying_feature_df

In [None]:
# create features from underlying OHLCV data
for days in range(1, 6):
    underlying_feature_df[f'return_{days}d'] = underlying_feature_df['c'].pct_change(periods=days)

for lag in range(1, 6):
    underlying_feature_df[f'lag_{lag}d'] = underlying_feature_df['c'].shift(lag)

for lag in range(3, 6):
    underlying_feature_df[f'serial_corr_{lag}d'] = underlying_feature_df['c'].rolling(window=lag).apply(lambda x: x.autocorr(), raw=False)

underlying_feature_df['50d_volatility'] = underlying_feature_df['c'].rolling(window=50).std()

vol_windows = [10, 20, 50, 100]
for window in vol_windows:
    underlying_feature_df[f'{window}d_volatility'] = underlying_feature_df['c'].rolling(window=window).std()

underlying_feature_df['high_low_range'] = underlying_feature_df['h'] - underlying_feature_df['l']
for window in vol_windows:
    underlying_feature_df[f'{window}d_high_low_vol'] = underlying_feature_df['high_low_range'].rolling(window=window).std()

underlying_feature_df['tr'] = np.maximum((underlying_feature_df['h'] - underlying_feature_df['l']), 
                           np.maximum(abs(underlying_feature_df['h'] - underlying_feature_df['c'].shift(1)),
                                      abs(underlying_feature_df['l'] - underlying_feature_df['c'].shift(1))))
underlying_feature_df['14d_ATR'] = underlying_feature_df['tr'].rolling(window=14).mean()


aligned_vrp = base_backtest_df['current_day_VRP'].reindex(underlying_feature_df.index)
underlying_feature_df = pd.concat([underlying_feature_df, aligned_vrp], axis=1)

def compute_rsi(data, window=14):
    delta = data.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

underlying_feature_df['14d_RSI'] = compute_rsi(underlying_feature_df['c'], window=14)

ma_windows = [10, 50, 100, 200]
for window in ma_windows:
    underlying_feature_df[f'{window}d_MA'] = underlying_feature_df['c'].rolling(window=window).mean()

underlying_feature_df['12d_EMA'] = underlying_feature_df['c'].ewm(span=12, adjust=False).mean()
underlying_feature_df['26d_EMA'] = underlying_feature_df['c'].ewm(span=26, adjust=False).mean()
underlying_feature_df['MACD'] = underlying_feature_df['12d_EMA'] - underlying_feature_df['26d_EMA']
underlying_feature_df['MACD_signal'] = underlying_feature_df['MACD'].ewm(span=9, adjust=False).mean()

underlying_feature_df['momentum_5d'] = underlying_feature_df['c'] / underlying_feature_df['c'].shift(5) - 1
underlying_feature_df['momentum_10d'] = underlying_feature_df['c'] / underlying_feature_df['c'].shift(10) - 1

underlying_feature_df['20d_MA'] = underlying_feature_df['c'].rolling(window=20).mean()
underlying_feature_df['20d_stddev'] = underlying_feature_df['c'].rolling(window=20).std()
underlying_feature_df['upper_band'] = underlying_feature_df['20d_MA'] + (underlying_feature_df['20d_stddev'] * 2)
underlying_feature_df['lower_band'] = underlying_feature_df['20d_MA'] - (underlying_feature_df['20d_stddev'] * 2)

underlying_feature_df['volatility_ratio'] = underlying_feature_df['10d_volatility'] / underlying_feature_df['50d_volatility']

underlying_feature_df['20d_high'] = underlying_feature_df['h'].rolling(window=20).max()
underlying_feature_df['20d_low'] = underlying_feature_df['l'].rolling(window=20).min()

underlying_feature_df['14d_ATRP'] = underlying_feature_df['14d_ATR'] / underlying_feature_df['c'] * 100

underlying_feature_df['MA_crossover_10_50'] = np.where(underlying_feature_df['10d_MA'] > underlying_feature_df['50d_MA'], 1, 0)

underlying_feature_df['price_to_50d_MA'] = underlying_feature_df['c'] / underlying_feature_df['50d_MA']
underlying_feature_df['price_to_200d_MA'] = underlying_feature_df['c'] / underlying_feature_df['200d_MA']

# Final cleanup: Drop intermediate calculation columns that were temporary
underlying_feature_df.drop(['tr', '20d_stddev', '20d_MA'], axis=1, inplace=True)

underlying_feature_df = underlying_feature_df.dropna()

In [None]:
# target variable - whether we should have traded on a given day or not
base_backtest_df['to_trade'] = np.where(base_backtest_df['net_pnl'] >= 0, 1, 0 )

# add the correct targets to the feature DataFrame
aligned_target = base_backtest_df['to_trade'].reindex(underlying_feature_df.index)
underlying_feature_df = pd.concat([underlying_feature_df, aligned_target], axis=1)
underlying_feature_df = underlying_feature_df.rename(columns={'to_trade': 'target'})

# Store the features as a variable for quick access
X = underlying_feature_df.drop(['o', 'c', 'h', 'l', 'target'], axis=1)

In [None]:
def group_by_period(df, training_period_length, backtest_period_length):
    """
    Splits a DataFrame into sequential training and backtesting periods for walk-forward training and testing.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame containing the time-series data to be split into training and backtesting periods.
    training_period_length : int
        The number of data points (e.g., days) to include in each training period.
    backtest_period_length : int
        The number of data points to include in each backtesting period.

    Returns
    -------
    tuple
        A tuple containing:
        - training_keys (list of tuples): A list of (start_index, end_index) pairs for each training period.
        - backtest_keys (list of tuples): A list of (start_index, end_index) pairs for each backtesting period.
        - training_period_data_dict (dict): A dictionary where each key is a (start_index, end_index) pair 
          representing a training period, and each value is a DataFrame containing the data for that period.
        - backtest_period_data_dict (dict): A dictionary where each key is a (start_index, end_index) pair 
          representing a backtesting period, and each value is a DataFrame containing the data for that period.
    """
    training_keys = []
    backtest_keys = []
    training_period_data_dict = {}
    backtest_period_data_dict = {}

    current_end_index = len(df)

    while current_end_index - backtest_period_length - training_period_length >= 0:
        backtest_end_index = current_end_index
        backtest_start_index = backtest_end_index - backtest_period_length
        training_end_index = backtest_start_index
        training_start_index = training_end_index - training_period_length

        if training_start_index < 0:
            break

        training_df = df.iloc[training_start_index:training_end_index]
        backtest_df = df.iloc[backtest_start_index:backtest_end_index]

        training_keys.append((training_start_index, training_end_index))
        backtest_keys.append((backtest_start_index, backtest_end_index))

        training_period_data_dict[(training_start_index, training_end_index)] = training_df
        backtest_period_data_dict[(backtest_start_index, backtest_end_index)] = backtest_df

        current_end_index = backtest_start_index

    training_keys.reverse()
    backtest_keys.reverse()
    training_period_data_dict = {k: training_period_data_dict[k] for k in reversed(training_period_data_dict)}
    backtest_period_data_dict = {k: backtest_period_data_dict[k] for k in reversed(backtest_period_data_dict)}
    
    return training_keys, backtest_keys, training_period_data_dict, backtest_period_data_dict

In [None]:
def metalabel(data, training_periods, testing_periods, quant_feature_list, cat_feature_list):
    """
    Generate metalabels for a base strategy by training a CatBoost classification model in a 
    walk-forward manner.

    Parameters
    ----------
    data : pd.DataFrame
        The input dataset containing features and target labels, indexed by date.
    training_periods : int
        The number of periods to use for training the model in each iteration.
    testing_periods : int
        The number of periods to use for testing (making predictions) in each iteration.
    quant_feature_list : list of str
        List of names of quantitative features (numerical features) in the dataset to be scaled and used in the model.
    cat_feature_list : list of str
        List of names of categorical features in the dataset to be used in the model.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing the original testing data augmented with metalabel predictions and confidence scores.
        The output DataFrame includes the following columns:
        - 'predicted_trade_action': The binary prediction (0 for no trade, or 1 for trade) of the trade action.
        - 'prediction_confidence': The confidence level of each prediction.
    """
    data = data[:-1].copy()
    
    keys, backtest_keys, period_data_dict, backtest_period_data_dict = group_by_period(
        data, training_periods, testing_periods
    )

    agg_backtest_df = pd.DataFrame()
    num_iterations = len(keys)

    for i in tqdm(range(num_iterations)):
        model_key = keys[i]
        train_df = period_data_dict[model_key].copy()
        
        scaler = StandardScaler()
        train_df[quant_feature_list] = scaler.fit_transform(train_df[quant_feature_list])

        all_features = quant_feature_list + cat_feature_list
        
        train_features = train_df[all_features]
        train_target = train_df['target'].values.flatten()

        model = CatBoostClassifier(
                                   loss_function='Logloss',
                                   eval_metric='Logloss',
                                   task_type='CPU',
                                   cat_features=[f for f in all_features if f in cat_feature_list],
                                   verbose=False)

        model.fit(train_features, train_target,
                  early_stopping_rounds=20,
                  plot=False)

        backtest_key = backtest_keys[i]
        backtest_df = backtest_period_data_dict[backtest_key].copy()
        
        backtest_df[quant_feature_list] = scaler.transform(backtest_df[quant_feature_list])
        
        test_features = backtest_df[all_features]

        probabilities = model.predict_proba(test_features)[:, 1] 
        predictions = (probabilities > 0.5).astype(int) 
        confidence = np.maximum(probabilities, 1 - probabilities)  

        prediction_df = pd.DataFrame({
            'predicted_trade_action': predictions,
            'prediction_confidence': confidence
        }, index=backtest_df.index)

        backtest_df = backtest_df.join(prediction_df)

        agg_backtest_df = pd.concat([agg_backtest_df, backtest_df], axis=0)

    return agg_backtest_df

metalabeled_backtest_df = metalabel(data=underlying_feature_df, training_periods=150, testing_periods=1, quant_feature_list=list(X.columns), cat_feature_list=[])

In [71]:
y_true = metalabeled_backtest_df['target']
y_pred = metalabeled_backtest_df['predicted_trade_action']

def evaluate_classification(y_true, y_pred):
    """
    Evaluate classification metrics and display results for a binary classification model.

    This function calculates the accuracy, precision, recall, and F1 score for a given set of 
    true and predicted labels. It also prints the confusion matrix and a detailed classification 
    report with precision, recall, F1 score, and support for each class.

    Parameters
    ----------
    y_true : array-like or pd.Series
        The ground truth (actual) labels.
    y_pred : array-like or pd.Series
        The predicted labels by the model.

    Returns
    -------
    dict
        A dictionary containing the computed classification metrics: accuracy, precision, recall, F1 score, 
        and confusion matrix. The detailed classification report is also printed.
    """

    # Calculate the classification metrics
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    accuracy = accuracy_score(y_true, y_pred)

    # Print the metrics
    print("Classification Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

    # Display the confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    print("\nConfusion Matrix:")
    print(conf_matrix)

    # Detailed classification report
    report = classification_report(y_true, y_pred, target_names=['No Trade', 'Trade'])
    print("\nClassification Report:")
    print(report)

    # Return metrics as a dictionary for further use
    metrics = {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "confusion_matrix": conf_matrix
    }

    return metrics

classification_metrics = evaluate_classification(y_true, y_pred)

Classification Metrics:
Accuracy: 0.8834
Precision: 0.8894
Recall: 0.9779
F1 Score: 0.9316

Confusion Matrix:
[[ 20  22]
 [  4 177]]

Classification Report:
              precision    recall  f1-score   support

    No Trade       0.83      0.48      0.61        42
       Trade       0.89      0.98      0.93       181

    accuracy                           0.88       223
   macro avg       0.86      0.73      0.77       223
weighted avg       0.88      0.88      0.87       223



In [None]:
# Take the meta-model's trade/no trade reccomendations and add them to the base strategy backtest DataFrame to create a meta-labeled backtest DataFrame.
# We will now only calculate PnL for a given day if the meta-model recommends to trade, setting net PnL to 0, effectively skipping the day otherwise
metalabeled_strat_backtest_df = base_backtest_df.copy()
aligned_preds = metalabeled_backtest_df['predicted_trade_action'].reindex(metalabeled_strat_backtest_df.index)
metalabeled_strat_backtest_df['predicted_trade_action'] = aligned_preds
metalabeled_strat_backtest_df = metalabeled_strat_backtest_df.dropna()
metalabeled_strat_backtest_df['predicted_trade_action'] = metalabeled_strat_backtest_df['predicted_trade_action'].astype(int)

metalabeled_strat_backtest_df['gross_pnl'] = metalabeled_strat_backtest_df.apply(calculate_pnl, axis=1)
metalabeled_strat_backtest_df['net_pnl'] = np.where(metalabeled_strat_backtest_df['predicted_trade_action'] == 1, metalabeled_strat_backtest_df['gross_pnl'] * metalabeled_strat_backtest_df['contracts'] - metalabeled_strat_backtest_df['fees'], 0)

capital = 3000

metalabeled_strat_backtest_df['net_capital'] = capital + (metalabeled_strat_backtest_df['net_pnl']*100).cumsum()
metalabeled_strat_backtest_df['cumulative_pnl'] = metalabeled_strat_backtest_df['net_pnl'].cumsum()

In [72]:
px.line(metalabeled_strat_backtest_df['net_capital']).show()
print(f"Meta-labeled strategy Sharpe: {sharpe_ratio(returns=metalabeled_strat_backtest_df['pct_return'])}")

metalabeled_strat_win_rate = len(metalabeled_strat_backtest_df[metalabeled_strat_backtest_df['net_pnl'] > 0]) / len(metalabeled_strat_backtest_df)
metalabeled_strat_avg_win = metalabeled_strat_backtest_df[metalabeled_strat_backtest_df['net_pnl'] > 0]['net_pnl'].mean() * 100
metalabeled_strat_avg_loss = abs(metalabeled_strat_backtest_df[metalabeled_strat_backtest_df['net_pnl'] < 0]['net_pnl'].mean() * 100)

print(f"Meta-labeled strategy win rate: {round(metalabeled_strat_win_rate * 100, 2)}%")
print(f"Meta-labeled strategy average win: ${round(metalabeled_strat_avg_win, 2)}")
print(f"Meta-labeled strategy average loss: ${round(metalabeled_strat_avg_loss, 2)}")
print(f"Meta-labeled strategy expected value per trade: ${round((metalabeled_strat_avg_win * metalabeled_strat_win_rate) - (metalabeled_strat_avg_loss * (1 - metalabeled_strat_win_rate)), 2)}")

Meta-labeled strategy Sharpe: 2.322992416558094
Meta-labeled strategy win rate: 79.37%
Meta-labeled strategy average win: $107.78
Meta-labeled strategy average loss: $329.16
Meta-labeled strategy expected value per trade: $17.65
