In [27]:
import os
import json
import time
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
import lightgbm as lgb
import xgboost as xgb
from catboost import CatBoostRegressor

sns.set(style="whitegrid", context="notebook")

In [14]:
DATA_PATH = "../data/raw/training_data_phoenix_memory.npz" 
MODEL_DIR = "../src/models"
ARTIFACT_DIR = "../src/results/phoenix_training"
USE_LOG_TARGET = True 
RANDOM_STATE = 42
TEST_SIZE = 0.2

os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(ARTIFACT_DIR, exist_ok=True)

In [28]:
def evaluate_model(y_true, y_pred):
    """Compute RMSE, MAE, and R²."""
    return {
        "rmse": root_mean_squared_error(y_true, y_pred),
        "mae": mean_absolute_error(y_true, y_pred),
        "r2": r2_score(y_true, y_pred),
    }

def save_json(obj, path):
    with open(path, "w") as f:
        json.dump(obj, f, indent=2)

def invert_target(y):
    return np.expm1(y) if USE_LOG_TARGET else y

In [16]:
print("Loading dataset from:", DATA_PATH)
data = np.load(DATA_PATH)
X = data["X"]
y = data["y"]
print(f"Loaded X shape: {X.shape}, y shape: {y.shape}")

Loading dataset from: ../data/raw/training_data_phoenix_memory.npz
Loaded X shape: (1000, 8), y shape: (1000,)


In [17]:
FEATURE_NAMES = [
    "S0",                 # Initial underlying level
    "autocall_barrier",   # Autocall barrier (fraction of S0)
    "coupon_barrier",     # Coupon barrier (fraction of S0)
    "knock_in_barrier",   # Knock-in barrier (fraction of S0)
    "coupon_rate",        # Coupon rate per observation period
    "r",                  # Risk-free rate
    "sigma",              # Volatility
    "T"                   # Maturity
]

In [18]:
df = pd.DataFrame(X, columns=FEATURE_NAMES)
df["y"] = y
print("\nData sample:")
display(df.head())

print("\nTarget summary:")
display(df["y"].describe())


Data sample:


Unnamed: 0,S0,autocall_barrier,coupon_barrier,knock_in_barrier,coupon_rate,r,sigma,T,y
0,100.0,0.034118,0.116146,1.0509,1.009219,0.676386,0.115401,0.681209,1.026903
1,100.0,0.013829,0.345926,2.724732,1.025649,0.686745,0.072826,0.682424,0.948888
2,100.0,0.037073,0.288982,2.818518,1.011595,0.769869,0.073893,0.651817,0.964858
3,100.0,0.008295,0.249337,1.956812,1.009217,0.652234,0.103695,0.647113,1.015962
4,100.0,0.04593,0.28766,2.792806,1.043235,0.682721,0.103845,0.686613,0.97994



Target summary:


count    1000.000000
mean        1.001878
std         0.028391
min         0.927037
25%         0.981723
50%         1.003808
75%         1.020567
max         1.103538
Name: y, dtype: float64

In [19]:
y_trans = np.log1p(y) if USE_LOG_TARGET else y

X_train, X_test, y_train, y_test = train_test_split(
    X, y_trans, test_size=TEST_SIZE, random_state=RANDOM_STATE
)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}")

Train shape: (800, 8), Test shape: (200, 8)


In [20]:
LGB_PARAMS = dict(
    n_estimators=1000,
    learning_rate=0.05,
    num_leaves=64,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=RANDOM_STATE,
    n_jobs=-1,
)

XGB_PARAMS = dict(
    n_estimators=1000,
    learning_rate=0.05,
    max_depth=6,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=RANDOM_STATE,
    tree_method="hist",
    n_jobs=-1,
)

CAT_PARAMS = dict(
    iterations=1000,
    learning_rate=0.05,
    depth=6,
    random_seed=RANDOM_STATE,
    verbose=100,
)

EARLY_STOPPING_ROUNDS = 50

In [22]:
t0 = time.time()
lgb_model = lgb.LGBMRegressor(**LGB_PARAMS)
try:
    lgb_model.fit(
        X_train, y_train,
        eval_set=[(X_test, y_test)],
        eval_metric="rmse",
        early_stopping_rounds=EARLY_STOPPING_ROUNDS,
        verbose=False,
    )
except TypeError:
    lgb_model.fit(X_train, y_train)
lgb_time = time.time() - t0
print(f"LightGBM trained in {lgb_time:.2f}s")

lgb_out = os.path.join(MODEL_DIR, "lightgbm_phoenix.txt")
try:
    lgb_model.booster_.save_model(lgb_out)
except Exception:
    import joblib
    joblib.dump(lgb_model, lgb_out.replace(".txt", ".pkl"))

[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000122 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1785
[LightGBM] [Info] Number of data points in the train set: 800, number of used features: 7
[LightGBM] [Info] Start training from score 0.693421
LightGBM trained in 0.53s


In [24]:
t0 = time.time()
xgb_model = xgb.XGBRegressor(**XGB_PARAMS)
try:
    xgb_model.fit(
        X_train, y_train,
        eval_set=[(X_test, y_test)],
        early_stopping_rounds=EARLY_STOPPING_ROUNDS,
        verbose=False,
    )
except TypeError:
    xgb_model.fit(X_train, y_train)
xgb_time = time.time() - t0
print(f"XGBoost trained in {xgb_time:.2f}s")

xgb_out = os.path.join(MODEL_DIR, "xgboost_phoenix.json")
xgb_model.save_model(xgb_out)

XGBoost trained in 0.17s


In [25]:
t0 = time.time()
cat_model = CatBoostRegressor(**CAT_PARAMS)
try:
    cat_model.fit(
        X_train, y_train,
        eval_set=(X_test, y_test),
        use_best_model=True,
        early_stopping_rounds=EARLY_STOPPING_ROUNDS,
        verbose=100,
    )
except Exception:
    cat_model.fit(X_train, y_train, verbose=False)
cat_time = time.time() - t0
print(f"CatBoost trained in {cat_time:.2f}s")

cat_out = os.path.join(MODEL_DIR, "catboost_phoenix.cbm")
cat_model.save_model(cat_out)

0:	learn: 0.0138454	test: 0.0129623	best: 0.0129623 (0)	total: 134ms	remaining: 2m 14s
100:	learn: 0.0017851	test: 0.0024327	best: 0.0024327 (100)	total: 215ms	remaining: 1.91s
200:	learn: 0.0009382	test: 0.0018397	best: 0.0018397 (200)	total: 288ms	remaining: 1.14s
300:	learn: 0.0006597	test: 0.0016650	best: 0.0016650 (300)	total: 363ms	remaining: 843ms
400:	learn: 0.0005324	test: 0.0016107	best: 0.0016101 (397)	total: 434ms	remaining: 648ms
500:	learn: 0.0004412	test: 0.0015804	best: 0.0015801 (499)	total: 500ms	remaining: 498ms
600:	learn: 0.0003766	test: 0.0015599	best: 0.0015596 (599)	total: 606ms	remaining: 402ms
700:	learn: 0.0003249	test: 0.0015487	best: 0.0015487 (700)	total: 673ms	remaining: 287ms
800:	learn: 0.0002836	test: 0.0015423	best: 0.0015423 (800)	total: 742ms	remaining: 184ms
900:	learn: 0.0002501	test: 0.0015386	best: 0.0015385 (876)	total: 807ms	remaining: 88.7ms
999:	learn: 0.0002233	test: 0.0015338	best: 0.0015337 (996)	total: 875ms	remaining: 0us

bestTest = 0.

In [29]:
y_test_inv = invert_target(y_test)

results = {}
for name, model in {
    "LightGBM": lgb_model,
    "XGBoost": xgb_model,
    "CatBoost": cat_model,
}.items():
    preds = invert_target(model.predict(X_test))
    results[name] = evaluate_model(y_test_inv, preds)
    results[name]["train_time_s"] = round(
        lgb_time if name == "LightGBM" else xgb_time if name == "XGBoost" else cat_time, 3
    )

print("\n=== Model Performance ===")
for k, v in results.items():
    print(f"{k:10s} → RMSE={v['rmse']:.4f}, MAE={v['mae']:.4f}, R²={v['r2']:.4f}")

save_json(results, os.path.join(ARTIFACT_DIR, "phoenix_model_results.json"))


=== Model Performance ===
LightGBM   → RMSE=0.0039, MAE=0.0030, R²=0.9777
XGBoost    → RMSE=0.0058, MAE=0.0039, R²=0.9525
CatBoost   → RMSE=0.0031, MAE=0.0020, R²=0.9861


