In [1]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf

In [2]:
# data acquisition
start = "2020-01-01"
end = "2025-12-31"
assets = ["ASML", "SPY", "SMH"]
df_raw = yf.download(assets, start = start, end = end, interval = "1d")
df_raw.columns

  df_raw = yf.download(assets, start = start, end = end, interval = "1d")
[*********************100%***********************]  3 of 3 completed


MultiIndex([( 'Close', 'ASML'),
            ( 'Close',  'SMH'),
            ( 'Close',  'SPY'),
            (  'High', 'ASML'),
            (  'High',  'SMH'),
            (  'High',  'SPY'),
            (   'Low', 'ASML'),
            (   'Low',  'SMH'),
            (   'Low',  'SPY'),
            (  'Open', 'ASML'),
            (  'Open',  'SMH'),
            (  'Open',  'SPY'),
            ('Volume', 'ASML'),
            ('Volume',  'SMH'),
            ('Volume',  'SPY')],
           names=['Price', 'Ticker'])

In [3]:
# data organization
close_cols = [
    ("Close", "ASML"),
    ("Close", "SMH"),
    ("Close", "SPY"),
]
df = pd.DataFrame()
df_close = df_raw[close_cols].copy()
df_close.columns = df_close.columns.droplevel(0)
df_close


Ticker,ASML,SMH,SPY
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2020-01-02,286.903198,69.703552,297.698975
2020-01-03,282.279877,68.561760,295.444763
2020-01-06,280.034485,67.829437,296.571838
2020-01-07,283.075806,68.961624,295.737976
2020-01-08,286.012634,69.077248,297.314117
...,...,...,...
2025-12-23,1061.839966,363.160004,687.960022
2025-12-24,1065.520020,364.170013,690.380005
2025-12-26,1072.750000,365.859985,690.309998
2025-12-29,1066.000000,364.209991,687.849976


In [20]:
# input feature 1: ASML vol adjusted momentum
L = 10
ASML_std = df_close["ASML"].rolling(window=L).std()
ASML_avg = df_close["ASML"].rolling(window=L).mean()
ASML_returns = df_close["ASML"].pct_change(L)
df_close["ASML_vol_adj_mom"] = (ASML_returns/ASML_std).dropna()

print(df_close["ASML_vol_adj_mom"].tail())

Date
2025-12-23   -0.001324
2025-12-24   -0.001647
2025-12-26   -0.002059
2025-12-29   -0.000666
2025-12-30   -0.000769
Name: ASML_vol_adj_mom, dtype: float64


In [5]:
# input features 2 and 3: SPY and SMH
df_close["SPY_returns"] = df_close["SPY"].pct_change(L).dropna()

df_close["SMH_returns"] = df_close["SMH"].pct_change(L).dropna()


In [6]:
#3 declare trade threshold y_trade
H = 5
threshold = 0.03

df_close["forward_return"] = (df_close["ASML"].shift(-H)/df_close["ASML"] -1)
df_close["y_trade"] = np.where(df_close["forward_return"].notna(),
    (df_close["forward_return"] > threshold).astype(int), 
    np.nan,
)


In [23]:
# define model_df
features = [
    "ASML_vol_adj_mom",
    "SPY_returns",
    "SMH_returns"
]

target = "y_trade"
model_cols = features + [target]
model_df = df_close[model_cols].replace([np.inf, -np.inf], np.nan).dropna()

print(model_df.head())

n_trades = (model_df["y_trade"] == 1).sum()
print(f"it produces {n_trades} trades")

len(model_df["ASML_vol_adj_mom"]) == len(model_df["y_trade"])
len(model_df["y_trade"]) == 0

Ticker      ASML_vol_adj_mom  SPY_returns  SMH_returns  y_trade
Date                                                           
2020-01-16         -0.002488     0.018623     0.005391      0.0
2020-01-17          0.003118     0.029589     0.026351      0.0
2020-01-21          0.005217     0.023668     0.038284      0.0
2020-01-22         -0.003951     0.026678     0.029621      0.0
2020-01-23         -0.009606     0.022407     0.034314      0.0
it produces 457 trades


False

In [8]:
# declare split
X = model_df[features]
y = model_df[target]

split = int(0.7* len(model_df))

X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

assert X_train.shape[0] == y_train.shape[0]
assert X.index.equals(y.index)


In [None]:
# declare pipeline
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

clf = Pipeline([
    ("scaler", StandardScaler()),
    ("logit", LogisticRegression(
        # regularization  
        C = 1e6,
        penalty = "l2",
        # Hessian
        solver = "lbfgs",
        max_iter =  5000,
        random_state = 0,
    ))
])


In [29]:
# C Sweep/Cross-Validation
from sklearn.metrics import log_loss, brier_score_loss
from sklearn.model_selection import TimeSeriesSplit

Cs = np.logspace(-4, 4, 13)
tscv = TimeSeriesSplit(n_splits=5)

rows = []

for C in Cs:
    clf.set_params(logit__C=float(C))

    fold_log_losses = []
    fold_briers = []

    for tr_idx, va_idx in tscv.split(X_train):
        X_tr, X_va = X_train.iloc[tr_idx], X_train.iloc[va_idx]
        y_tr, y_va = y_train.iloc[tr_idx], y_train.iloc[va_idx]

        clf.fit(X_tr, y_tr)
        p_va = clf.predict_proba(X_va)[:, 1]

        fold_log_losses.append(log_loss(y_va, p_va, labels=[0, 1]))
        fold_briers.append(brier_score_loss(y_va, p_va))

    rows.append((float(C), float(np.mean(fold_log_losses)), float(np.mean(fold_briers))))

C_sweep = (
    pd.DataFrame(rows, columns=["C", "cv_log_loss", "cv_brier"])
      .sort_values("cv_log_loss")
      .reset_index(drop=True)
)

best_c = float(C_sweep.loc[0, "C"])
print("best_c:", best_c)

C_sweep.head()

best_c: 0.046415888336127774


Unnamed: 0,C,cv_log_loss,cv_brier
0,0.046416,0.61554,0.212158
1,0.215443,0.616294,0.212303
2,0.01,0.61668,0.212745
3,0.002154,0.61816,0.213414
4,0.000464,0.619016,0.213795


In [26]:
# compute best fit/probability
clf.set_params(logit__C=best_c)
clf.fit(X_train, y_train)

# output p_train as a diagnostic, not as a used output because it's fitted on X_train
p_train = pd.Series(clf.predict_proba(X_train)[:, 1])
p_test = pd.Series(clf.predict_proba(X_test)[:, 1], index=y_test.index, name="p test")
print(p_test.tail())
assert p_test.between(0,1).all()

Date
2025-12-16    0.322122
2025-12-17    0.354162
2025-12-18    0.338988
2025-12-19    0.327541
2025-12-22    0.318027
Name: p test, dtype: float64


In [12]:
# Access coefficients from the nested LogisticRegression estimator in the Pipeline
clf.named_steps['logit'].coef_

array([[ 0.21302843, -0.06247569, -0.27686836]])

In [13]:
# output inspection
logit = clf.named_steps["logit"]

coefs = logit.coef_[0]
intercept = logit.intercept_[0]

coef_df = pd.DataFrame({
    "feature": features,
    "coef": coefs
}).sort_values("coef", key=np.abs, ascending=False)

print("Intercept:", round(intercept, 4))
print("\nCoefficients (sorted by absolute size):")
print(coef_df.round(4))

Intercept: -0.7893

Coefficients (sorted by absolute size):
            feature    coef
2       SMH_returns -0.2769
0  ASML_vol_adj_mom  0.2130
1       SPY_returns -0.0625


In [14]:
# inspection
train_base_rate = y_train.mean()
test_base_rate = y_test.mean()
test_avg_p = p_test.mean()

print(f"training base rate: {train_base_rate}")
print(f"testing base rate: {test_base_rate}")
print(f"test average probability: {test_avg_p}")


training base rate: 0.31417624521072796
testing base rate: 0.28794642857142855
test average probability: 0.30725167088418376


In [15]:
# Generalization test
ll_train = log_loss(y_train, p_train, labels=[0,1])
ll_test = log_loss(y_test,  p_test,  labels=[0,1])
br_train = brier_score_loss(y_train, p_train)
br_test = brier_score_loss(y_test,  p_test)

gap_ll = ll_test - ll_train
gap_bs = br_test - br_train
gap_bs


np.float64(-0.014532922065462806)

In [16]:
# Frequency Calibration
p0 = float(train_base_rate)

p0_train = np.full(len(y_train), p0)
p0_test  = np.full(len(y_test),  p0)

ll_train_base = log_loss(y_train, p0_train, labels=[0, 1])
ll_test_base  = log_loss(y_test,  p0_test,  labels=[0, 1])

br_train_base = brier_score_loss(y_train, p0_train)
br_test_base  = brier_score_loss(y_test,  p0_test)

improve_ll_test = (ll_test_base - ll_test) / ll_test_base
improve_bs_test = (br_test_base - br_test) / br_test_base

print(f"Base-rate p0 (train win rate): {p0:.3f}")
print(f"log_loss  train  model={ll_train:.4f}  base={ll_train_base:.4f}")
print(f"log_loss  test   model={ll_test:.4f}  base={ll_test_base:.4f}")
print(f"Brier     train  model={br_train:.4f}  base={br_train_base:.4f}")
print(f"Brier     test   model={br_test:.4f}  base={br_test_base:.4f}")
print(f"Test improvement vs base (log_loss): {improve_ll_test:.2%}")
print(f"Test improvement vs base (Brier):    {improve_bs_test:.2%}")


Base-rate p0 (train win rate): 0.314
log_loss  train  model=0.6147  base=0.6224
log_loss  test   model=0.5828  base=0.6019
Brier     train  model=0.2121  base=0.2155
Brier     test   model=0.1975  base=0.2057
Test improvement vs base (log_loss): 3.18%
Test improvement vs base (Brier):    3.98%


Test loss < train loss, unexpected gap. Warrants further inspection

In [17]:
# Leakage test
y_train_shuffled = y_train.sample(frac=1, random_state=0).reset_index(drop=True)
y_train_shuffled

shuffled_params = clf.set_params(logit__C=best_c)
shuffled_model = clf.fit(X_train, y_train_shuffled)

p_train_shuffled = shuffled_model.predict_proba(X_train)[:, 1]
p_test_shuffled = shuffled_model.predict_proba(X_test)[:, 1]

ll_train_shuffled = log_loss(y_train, p_train_shuffled, labels=[0,1])
ll_test_shuffled = log_loss(y_test, p_test_shuffled, labels=[0,1])

br_train_shuffled = brier_score_loss(y_train, p_train_shuffled)
br_test_shuffled = brier_score_loss(y_test, p_test_shuffled)

improve_ll_test_shuffled = (ll_test_base - ll_test_shuffled)/ll_test_base
improve_br_test_shuffled = (br_test_base - br_test_shuffled)/br_test_base

print("\n=== Model vs Baseline (SHUFFLED labels) ===")
print(f"log_loss test  model={ll_test_shuffled:.4f}  base={ll_test_base:.4f}")
print(f"Brier    test  model={br_test_shuffled:.4f}  base={br_test_base:.4f}")
print(f"Test improvement vs base (log_loss): {improve_ll_test_shuffled*100:6.2f}%")
print(f"Test improvement vs base (Brier):    {improve_br_test_shuffled*100:6.2f}%")



=== Model vs Baseline (SHUFFLED labels) ===
log_loss test  model=0.6045  base=0.6019
Brier    test  model=0.2068  base=0.2057
Test improvement vs base (log_loss):  -0.43%
Test improvement vs base (Brier):     -0.55%


Given large standard vs shuffled gap 35-45% varying from brier vs log_loss, I determined it is unnecessary to further define a quantifiable threshold, although it's usually best practice to do so. 

In [18]:
# As-of-Freeze Test
T = df_raw.index[int(len(df_raw)*0.90)]  # pick an interior cutoff

def freeze(df):
    c = df[[("Close","ASML"),("Close","SMH"),("Close","SPY")]].copy()
    c.columns = c.columns.droplevel(0)
    c["ASML_vol_adj_mom"] = c["ASML"].pct_change(L) / c["ASML"].rolling(L).std()
    c["SPY_returns"] = c["SPY"].pct_change(L)
    c["SMH_returns"] = c["SMH"].pct_change(L)
    return c[["ASML_vol_adj_mom","SPY_returns","SMH_returns"]].replace([np.inf,-np.inf], np.nan)

xf, xc = freeze(df_raw), freeze(df_raw.loc[:T])
d = (xf.loc[T] - xc.loc[T]).abs()

print("T:", T)
print("PASS:", (d.max(skipna=True) <= 1e-10) and not (xf.loc[T].isna() ^ xc.loc[T].isna()).any())
print("diff:\n", d)


T: 2025-05-27 00:00:00
PASS: True
diff:
 Ticker
ASML_vol_adj_mom    0.0
SPY_returns         0.0
SMH_returns         0.0
Name: 2025-05-27 00:00:00, dtype: float64


It's just an examination of whether information at t is exposed to additional information at t+1 due to lookback window. We're declare T then model with it to produce segmented data. From there, we compare the segmented data xc to the full history data xf, in the ways we derived the input features. If no future data (t+1) has contaminated data at t, then the gap between a set of randomly selected column from start -> T should be 0. 

In [19]:
# Probability rank/calibration
n_buckets = 10

calib = pd.DataFrame({
    "p": p_test.values,
    "y": y_test.values
}).sort_values("p")

calib["bucket"] = pd.qcut(calib["p"], q=n_buckets, labels=False, duplicates="drop")

stats = calib.groupby("bucket").agg(
    n=("y", "size"),
    avg_p=("p", "mean"),
    hit_rate=("y", "mean"),
).reset_index()

stats["bucket"] += 1
stats["error"] = stats["avg_p"] - stats["hit_rate"]

rank_check = stats["avg_p"].is_monotonic_increasing
mae = stats["error"].abs().mean()

print("Rank check:", "PASS" if rank_check else "FAIL")
print("Mean abs calibration error:", round(mae, 4))
print("\nBucket stats:")
print(stats[["bucket", "n", "avg_p", "hit_rate", "error"]].round(4))


Rank check: PASS
Mean abs calibration error: 0.0677

Bucket stats:
   bucket   n   avg_p  hit_rate   error
0       1  45  0.2298    0.1333  0.0965
1       2  45  0.2588    0.1778  0.0810
2       3  45  0.2721    0.1556  0.1165
3       4  44  0.2839    0.3182 -0.0342
4       5  45  0.2951    0.4000 -0.1049
5       6  45  0.3056    0.3111 -0.0055
6       7  44  0.3181    0.2273  0.0908
7       8  45  0.3356    0.2889  0.0467
8       9  45  0.3591    0.3556  0.0035
9      10  45  0.4143    0.5111 -0.0968


Use graphs to inspect... 
3D graph: each axis is an input feature. Color data points using y_trade classification. Would make it interesting. 
Gotta compute the actual outputs and derive an interpretation of them
Any important interpretation should be followed by a graph