In [1]:
# Feature engineering and model selection
from sklearn.pipeline import Pipeline

# Models and metrics
import xgboost as xgb
from sklearn.calibration import calibration_curve
from sklearn.metrics import brier_score_loss

# Classification metrics
from sklearn.metrics import classification_report, roc_curve, roc_auc_score, precision_recall_curve, auc

# Suppress all warnings
import warnings
warnings.filterwarnings('ignore')

# Plotting
import matplotlib.pyplot as plt
import seaborn as sns

# Analytical helper functions to declutter the notebook
from analysis.ml.feature_engineering.custom_transformers import *
from analysis.ml.data import calculate_label_uniquness
from analysis.stats import *
from analysis.ml.data import get_ml_dataset, get_ml_features

# Other imports
import os
import pandas as pd
import numpy as np
from numba import njit, prange
import joblib

In [2]:
# Specify window sizes for rolling min-max and z-score scaling
window_sizes_scaling = [7, 30, 90, 180, 365]

# Specify window sizes for returns-based features
window_sizes_returns = [1, 7, 30, 90, 180, 365]

# Specify lookback window for correlation features
lookback_windows_correlation = [7, 30, 90, 180, 365]

# Specify maximum holding times for triple barrier labeling
max_holding_times_triple_barrier_label = [7]

# Specify lookback window for horizontal barriers for triple barrier labeling
std_lookback_windows = [7]

# Pipeline for feature engineering and modeling
feature_engineering_pipeline = Pipeline([

    ('ta_features', TAFeatures(windows = window_sizes_scaling)),

    ('time_features', TimeFeatures()),

    # ('returns_features', ReturnsFeatures(
    #     window_sizes = window_sizes_returns,
    #     lookback_windows = lookback_windows_correlation,
    # )),

    ('order_book_features', OrderBookFeatures()),

    ('correlation_features', CorrelationFeatures(
        window_sizes = window_sizes_returns, 
        lookback_windows = lookback_windows_correlation,
        period = '1d'
    )),

    ('rolling_z_score', RollingZScoreScaler(window_sizes = window_sizes_scaling)),

    ('fill_na', FillNaTransformer()),

    # ('lag_features', LagFeatures([1, 2, 3, 4, 5, 6, 7])),

    ('triple_barrier_labels', TripleBarrierLabelFeatures(
        max_holding_times = max_holding_times_triple_barrier_label, 
        std_lookback_windows = std_lookback_windows
    )),

])

In [None]:
X = get_ml_features()
print('X shape:', X.shape)
X.head()

In [None]:
# Columns we need to drop before training the model
triple_barrier_label_cols = [
    col for col in X if 'triple_barrier_label_h' in col
]

trade_returns_cols = [
    col for col in X if 'trade_returns' in col
]

non_numeric_cols = [
    'asset_id_base', 'asset_id_quote', 'exchange_id', 'Unnamed: 0'
]

forward_returns_cols = [
    'open', 'high', 'low', 'close', 'start_date_triple_barrier_label_h24', 
    'end_date_triple_barrier_label_h24', 'avg_uniqueness', 'time_period_end'
]

cols_to_drop = (
    triple_barrier_label_cols + 
    trade_returns_cols + 
    non_numeric_cols +
    forward_returns_cols
)

In [None]:
# Ensure train_date_cutoff is a datetime object to prevent data leakage
train_date_cutoff = X['time_period_end'].quantile(0.7)

# Split the dataset into training and testing set
X_train = X[X['time_period_end'] <= train_date_cutoff]
X_test = X[
    (X['time_period_end'] > train_date_cutoff) &
    (X['time_period_end'] <= '2023-12-31')
]

train_min_date = X_train['time_period_end'].min()
train_max_date = X_train['time_period_end'].max()

test_min_date = X_test['time_period_end'].min()
test_max_date = X_test['time_period_end'].max()

print('Training set date range:', train_min_date, train_max_date)
print('Testing set date range:', test_min_date, test_max_date)

# Ensure no data leakage
data_leakage_indicator = (
    # Ensure that the training set doesn't overlap with the testing set
    (X_train['time_period_end'].max() >= X_test['time_period_end'].min())
)
assert not data_leakage_indicator, 'Data leakage detected!'

y_train = ((X_train['triple_barrier_label_h7'] == 1) | ((X_train['triple_barrier_label_h7'] == 0) & (X_train['trade_returns_h7'] > 0))).astype(int)
y_test = ((X_test['triple_barrier_label_h7'] == 1) | ((X_test['triple_barrier_label_h7'] == 0) & (X_test['trade_returns_h7'] > 0))).astype(int)

y_train_reg = X_train['returns_1'].shift(-1).abs()
y_test_reg = X_test['returns_1'].shift(-1).abs()

# Filters for the training and testing set
# train_filter = (
#     X_train['returns_1_rz_24'].abs() >= 1
# )

# test_filter = (
#     X_test['returns_1_rz_24'] >= 1
# )

# Apply the filters (Optional)
# X_train = X_train[train_filter]
# X_test = X_test[test_filter]

# y_train = y_train[train_filter]
# y_test = y_test[test_filter]

# Make these columns categorical for XGBoost
X_train[['symbol_id', 'time_period_end']] = X_train[['symbol_id', 'time_period_end']].astype('category')
X_test[['symbol_id', 'time_period_end']] = X_test[['symbol_id', 'time_period_end']].astype('category')

In [None]:
print('X_train shape:', X_train.shape)
print('X_test shape:', X_test.shape)
print()
print('Y train distribution:')
print(y_train.value_counts(normalize = True))
print()
print('Y test distribution:')
print(y_test.value_counts(normalize = True))

In [None]:
# Load pretrained model
path = '/Users/louisspencer/Desktop/Trading-Bot/data/pretrained_models/classification/xgboost_model_2023_july_to_sept.pkl'
model = joblib.load(path)
model.enable_categorical = True
model

In [None]:
# Calibration curve for calibrated model on test set
fig, ax = plt.subplots(1, 1, figsize = (12, 6))

brier_calibrated = brier_score_loss(y_test, model.predict_proba(X_test.drop(columns = ['time_period_end'], errors = 'ignore'))[:, 1])
display = CalibrationDisplay.from_estimator(model, X_test.drop(columns = ['time_period_end'], errors = 'ignore'), y_test, ax = ax)
display.plot(ax = ax)
ax.set_title(f'Calibration Curve (Calibrated Model, Brier Score: {brier_calibrated:.4f})')
ax.set_xlabel('Mean Predicted Probability')
ax.set_ylabel('Fraction of Positives')

plt.tight_layout()
plt.show()

In [None]:
# Performance Metrics
pred_prob_test = model.predict_proba(X_test.drop(columns = cols_to_drop + ['time_period_end'], errors = 'ignore'))[:, 1]
y_pred_test = model.predict(X_test.drop(columns = cols_to_drop + ['time_period_end'], errors = 'ignore'))

print('XGBoost Test Performance:')
print()
print(classification_report(y_test, y_pred_test))
print()
print('Baseline Model Performance (Random):')
print()
print(classification_report(y_test, np.random.randint(0, 2, len(y_test))))

In [None]:
# Plot the ROC curve (2 subplots)
fig, ax = plt.subplots(1, 2, figsize = (18, 5))

# ROC Curve
fpr, tpr, thresholds = roc_curve(y_test, pred_prob_test)
roc_auc = auc(fpr, tpr)

sns.lineplot(x = fpr, y = tpr, ax = ax[0])
sns.lineplot(x = [0, 1], y = [0, 1], color = 'black', linestyle = '--', ax = ax[0])
ax[0].set_title(f'ROC Curve (AUC: {roc_auc:.3f})')
ax[0].set_xlabel('False Positive Rate')
ax[0].set_ylabel('True Positive Rate')

# Precision-Recall Curve
precision, recall, thresholds = precision_recall_curve(y_test, pred_prob_test)
pr_auc = auc(recall, precision)

sns.lineplot(x = recall, y = precision, ax = ax[1])
ax[1].set_title(f'Precision-Recall Curve (PR_AUC: {pr_auc:.3f})')
ax[1].set_xlabel('Recall')
ax[1].set_ylabel('Precision')

plt.show()

In [None]:
# Plot top N most important features as horizontal bar plot
top_n = 40
feature_importances = pd.Series(model.feature_importances_, index = X_test.columns)
feature_importances = feature_importances.sort_values().tail(top_n)

plt.figure(figsize = (10, 7))
feature_importances.plot(kind = 'barh')
plt.title(f'Top {top_n} Most Important Features')
plt.show()

In [None]:
# Calculate the sample statistic on the test set
X_test['y_pred'] = y_pred_test
sample_means = X_test[['y_pred', 'trade_returns_h7']].groupby('y_pred').mean()
sample_statistic = round(sample_means.loc[1, 'trade_returns_h7'] - sample_means.loc[0, 'trade_returns_h7'], 3)

# Calculate the information coefficient of the model's predictions
information_coefficient = round(X_test[['y_pred', 'trade_returns_h7']].corr().iloc[0, 1], 3)
print('Information Coefficient:', information_coefficient)

# Calculate the sample statistic for a random baseline
X_test['y_pred_random'] = np.random.randint(0, 2, len(X_test))
sample_means_random = X_test[['y_pred_random', 'trade_returns_h7']].groupby('y_pred_random').mean()
sample_statistic_random = round(sample_means_random.loc[1, 'trade_returns_h7'] - sample_means_random.loc[0, 'trade_returns_h7'], 3)

print(f'Sample statistic: {sample_statistic}')
print(f'Random baseline statistic: {sample_statistic_random}')
print()
print(sample_means)
print()
print(sample_means_random)

In [None]:
# Average trade returns for true positives (correct 1 predictions)
mean_true_positive_returns = X_test[(X_test['y_pred'] == 1) & (y_test == 1)]['trade_returns_h7'].mean()
# Average trade returns for false positives (incorrect 1 predictions)
mean_false_positive_returns = X_test[(X_test['y_pred'] == 1) & (y_test == 0)]['trade_returns_h7'].mean()
# Average trade returns for true negatives (correct 0 predictions)
mean_true_negative_returns = X_test[(X_test['y_pred'] == 0) & (y_test == 0)]['trade_returns_h7'].mean()
# Average trade returns for false negatives (incorrect 0 predictions)
mean_false_negative_returns = X_test[(X_test['y_pred'] == 0) & (y_test == 1)]['trade_returns_h7'].mean()

tp_minus_fp = round(mean_true_positive_returns - abs(mean_false_positive_returns), 3)

print('Mean Trade Returns for True Positives:', mean_true_positive_returns) 
print('Mean Trade Returns for False Positives:', mean_false_positive_returns)
print('Mean Trade Returns for True Negatives:', mean_true_negative_returns)
print('Mean Trade Returns for False Negatives:', mean_false_negative_returns)
print()
print('(TP - FP):', tp_minus_fp)

In [None]:
# Calculate Expectancy
win_rate = (y_pred_test == y_test).mean()
avg_win = mean_true_positive_returns
avg_loss = abs(mean_false_positive_returns)

expectancy = (win_rate * avg_win) - ((1 - win_rate) * avg_loss)
print('Expectancy:', round(expectancy, 3))

In [None]:
oos_avg_pred_returns = X_test.groupby(['symbol_id', 'y_pred'])['trade_returns_h7'].mean()
positive = oos_avg_pred_returns.loc[(slice(None), 1)]
positive = positive.reindex(X_test['symbol_id'].unique(), fill_value = 0)

negative = oos_avg_pred_returns.loc[(slice(None), 0)]
negative = negative.reindex(X_test['symbol_id'].unique(), fill_value = 0)

positive = positive.sort_values()
negative = negative.sort_values()

diff = pd.DataFrame({
    'positive': positive,
    'negative': negative
})
diff['diff'] = diff['positive'] - diff['negative']
diff = diff.sort_values('diff', ascending = False)

# Bar plot of the average positive prediction returns across different symbols (Seaborn)
plt.figure(figsize = (30, 7))
sns.barplot(x = positive.index.get_level_values(0), y = positive.values)
plt.title('Average Positive Prediction Returns Across Different Symbols')
plt.xticks(rotation = 90)
plt.show()

# Bar plot of the average negative prediction returns across different symbols (Seaborn)
plt.figure(figsize = (30, 7))
sns.barplot(x = negative.index.get_level_values(0), y = negative.values)
plt.title('Average Negative Prediction Returns Across Different Symbols')
plt.xticks(rotation = 90)
plt.show()

# Bar plot of the difference in average returns across different symbols (Seaborn)
plt.figure(figsize = (30, 7))
sns.barplot(x = diff.index.get_level_values(0), y = diff['diff'])
plt.title('Difference in Average Returns Across Different Symbols')
plt.xticks(rotation = 90)
plt.show()

In [None]:
# Run the OOS statistical tests to check model robustness
run_oos_statistical_tests(
    y_pred_test, 
    X_test['trade_returns_h7'].values, 
    n_simulations = 1_000_000,
    sample_statistic = sample_statistic
)

In [None]:
# Drop the y_pred column
drop_cols = ['y_pred', 'forward_returns_7','forward_returns_30', 'forward_returns_1', 'forward_avg_returns_1_7d', 'forward_pos_neg_volatility_ratio_7', 'forward_returns_7']
X_test.drop(columns = drop_cols, errors = 'ignore', inplace = True)