Testing Notebook

This notebook will be used to import and run the programmed scripts to keep outputs contained into just this file.

0. Initial import of modules

In [None]:
import os
import torch
print(os.getcwd())
torch.manual_seed(42)

In [None]:
from preprocessing.fetch_binance import fetch_binance
from preprocessing.prep_data import feature_creation, z_score_norm, window_creation, train_val_test_split, target_def_knext, make_label_k_epsilon

1. Obtaining the data

In [None]:
df = fetch_binance(
    exchange_ticker='BTC/USDT',
    start_date='2017-01-01T00:00:00Z',
    timeframe='1h',
    cache_path=('./data/usd_btc_binance.csv'),
    max_age_hrs=3
)
df.head()

2. Normalisation and Window Creation

In [None]:
import numpy as np
#df = target_def_knext(df,4)
labeled_df = make_label_k_epsilon(df,12,0.7,0.7,True)
mod_df = feature_creation(labeled_df)

norm_df = z_score_norm(mod_df, train_frac= 0.7)

x, y = window_creation(norm_df, window_size=48)

x.shape, y.shape


3. Splitting data into test, validation and training sets

In [None]:
x_train, y_train, x_val, y_val,x_test, y_test = train_val_test_split(x, y)
x_lengths = map(len,[x_train,x_val,x_test])
y_lengths = map(len,[y_train,y_val,y_test])
print(list(x_lengths))
print(list(y_lengths)) 


4. Wrap the data in custom datasets so torch dataloaders can be used

In [None]:
from torch.utils.data import DataLoader
from datasets.sequence_dataset import SequenceDataset

batch_size= 64
train_loader= DataLoader(SequenceDataset(x_train,y_train),batch_size=batch_size, shuffle=True)
val_loader = DataLoader(SequenceDataset(x_val,y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(SequenceDataset(x_test,y_test),batch_size=batch_size, shuffle=False)


In [None]:
def inspect_split(loader):
    n = 0; pos = 0
    for _, y in loader:
        y = y
        n += y.numel()
        pos += y.sum().item()
    p = pos / n
    print(f"Samples={n}, Positives={pos} ({p:.3f})")
    return p

print("Train split:"); p_train = inspect_split(train_loader)
print("Val split:");   p_val   = inspect_split(val_loader)


5. Model set up and Training

In [None]:
from models.lstm import LSTMClassifier
from train.train_model import fit , positive_weight, select_threshold_constrained, threshold_free_metrics

input_size = x_train.size(-1)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMClassifier(input_size=input_size,hidden_size=128,num_layers= 2, dropout= 0.1)
pos_weight = positive_weight(y_train,device)
model,t_star,history = fit(
        model,
        train_loader=train_loader,
        val_loader=val_loader,
        epochs=40,
        lr=1e-3,              
        device=device,
        save_path="models/best.pt",
        patience= 7,            # optional early stopping
        pos_weight= None
    )

In [None]:
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score, average_precision_score, confusion_matrix

model.eval()
probs_test, ys_test = [], []

with torch.no_grad():
    for xb, yb in test_loader:
        # probs in [0,1]
        probs_test.append(model(xb.to(device)).sigmoid().cpu().numpy())
        ys_test.append(yb.numpy())

probs_test = np.concatenate(probs_test).ravel()
ys_test   = np.concatenate(ys_test).ravel()

# Lock the val-chosen threshold for test
yhat_test = (probs_test >= t_star).astype(float)

print(f"t* (chosen on validation): {t_star:.3f}")
print("TEST ROC-AUC:", roc_auc_score(ys_test, probs_test))
print("TEST PR-AUC:",  average_precision_score(ys_test, probs_test))
print("TEST F1:",      f1_score(ys_test, yhat_test))
print("TEST Precision:", precision_score(ys_test, yhat_test))
print("TEST Recall:",    recall_score(ys_test, yhat_test))
print("TEST Confusion Matrix:\n", confusion_matrix(ys_test, yhat_test))


In [None]:
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score, average_precision_score

# Baseline: predict all positives
yhat_allpos = np.ones_like(ys_test)

print("BASELINE (all positive)")
print("F1:", f1_score(ys_test, yhat_allpos))
print("Precision:", precision_score(ys_test, yhat_allpos))
print("Recall:", recall_score(ys_test, yhat_allpos))
print("ROC-AUC:", 0.5)
print("PR-AUC (≈ base rate):", ys_test.mean())


Beginning of HyperTuning - Testing to see the model can learn and if changing things elsewhere (data/features/splitting/optimization)

In [None]:
def tiny_overfit(model, train_ds, steps=500, k=64, lr=3e-3):
    from torch.optim import AdamW
    from torch.nn.utils import clip_grad_norm_
    import random, torch, torch.nn as nn
    idx = torch.randperm(len(train_ds))[:k]
    X_small = []; y_small = []
    for i in idx:
        X, y = train_ds[i]
        X_small.append(X.unsqueeze(0)); y_small.append(y)
    X_small = torch.cat(X_small, dim=0)  # [k, T, F]
    y_small = torch.tensor(y_small).float()  # [k]

    model.train()
    opt = AdamW(model.parameters(), lr=lr)
    crit = torch.nn.BCEWithLogitsLoss()
    for t in range(steps):
        opt.zero_grad(set_to_none=True)
        logits = model(X_small.to(device))
        loss = crit(logits.squeeze(-1), y_small.to(device))
        loss.backward()
        clip_grad_norm_(model.parameters(), 1.0)
        opt.step()
        if (t+1) % 50 == 0:
            with torch.no_grad():
                p = (logits.squeeze(-1).sigmoid()>0.5).float()
                acc = (p.cpu()==y_small).float().mean().item()
            print(f'{t+1:04d} | loss {loss.item():.4f} | acc {acc:.3f}')

tiny_overfit(
    model=model,
    train_ds=SequenceDataset(x_train,y_train),
    steps=500,   # updates, not epochs
    k=64,        # number of samples to memorize
    lr=3e-3      # slightly higher LR for speed
)

#FROM THIS CELLS OUTPUT OF AN ACCURACY OF 1+ CONSISTENLY AND LOSS OF 0  SO MODEL CAN DEFINETLY LEARN

In [None]:
from sklearn.linear_model import LogisticRegression

#Test a regression model on data

def make_tabular_last(norm_df, window_size=48):
    feats = norm_df.drop(columns=['label'])
    y = norm_df['label'].to_numpy().astype(int)
    Xrows, yrows = [], []
    for i in range(len(norm_df) - window_size + 1):
        j = i + window_size - 1
        Xrows.append(feats.iloc[j].to_numpy())
        yrows.append(y[j])
    X = np.vstack(Xrows); y = np.array(yrows)
    return X, y

def chrono_split(X, y, train_frac=0.7, val_frac=0.15):
    n = len(y)
    i_tr = int(n * train_frac)
    i_v  = int(n * (train_frac + val_frac))
    return (X[:i_tr], y[:i_tr]), (X[i_tr:i_v], y[i_tr:i_v]), (X[i_v:], y[i_v:])

def eval_with_constraints(probs, y, min_pos_rate=0.05, max_pos_rate=0.95, min_precision=None):
    roc, pr = threshold_free_metrics(probs, y)
    tinfo = select_threshold_constrained(probs, y, min_pos_rate, max_pos_rate, min_precision)
    if tinfo["f1"] < 0:
        return {"roc": roc, "pr": pr, "t": None, "f1": None, "prec": None, "rec": None, "pos_rate": None}
    yhat = (probs >= tinfo["t"])
    return {
        "roc": roc, "pr": pr, "t": tinfo["t"], "f1": tinfo["f1"],
        "prec": precision_score(y, yhat, zero_division=0),
        "rec":  recall_score(y, yhat, zero_division=0),
        "pos_rate": float(yhat.mean())
    }

def logistic_baseline(norm_df, window_size=48, C=1.0, max_iter=2000):
    X, y = make_tabular_last(norm_df, window_size)
    (Xtr, ytr), (Xv, yv), (Xte, yte) = chrono_split(X, y, 0.7, 0.15)
    cls_wt = "balanced" if (ytr.mean() < 0.35 or ytr.mean() > 0.65) else None
    lr = LogisticRegression(C=C, max_iter=max_iter, solver="lbfgs", class_weight=cls_wt)
    lr.fit(Xtr, ytr)

    pv = lr.predict_proba(Xv)[:,1]
    min_prec = max(0.55, float(yv.mean()))
    val_metrics = eval_with_constraints(pv, yv, 0.05, 0.95, min_prec)

    pt = lr.predict_proba(Xte)[:,1]
    if val_metrics["t"] is not None:
        yhat_t = (pt >= val_metrics["t"])
        test_metrics = {
            "roc": roc_auc_score(yte, pt),
            "pr":  average_precision_score(yte, pt),
            "f1":  f1_score(yte, yhat_t, zero_division=0),
            "prec": precision_score(yte, yhat_t, zero_division=0),
            "rec":  recall_score(yte, yhat_t, zero_division=0),
            "pos_rate": float(yhat_t.mean()),
            "t": val_metrics["t"],
        }
    else:
        test_metrics = {"roc": roc_auc_score(yte, pt), "pr": average_precision_score(yte, pt),
                        "f1": None, "prec": None, "rec": None, "pos_rate": None, "t": None}
    return val_metrics, test_metrics

val_m, test_m = logistic_baseline(norm_df, window_size=48)
print("LogReg VAL:", val_m)
print("LogReg TEST:", test_m)


In [None]:
feat_df = feature_creation(df) 

#Try a few labels: k in {6,8,12}, epsilon= 60% or 70% 
tests = []
for k in (6, 8, 12):
    for q in (0.60, 0.70):
        labeled = make_label_k_epsilon(feat_df, k=k, eps_quantile=q, train_frac=0.7, use_log_returns=True)
        norm = z_score_norm(labeled, train_frac=0.7)

        # Find baseline results for comapring
        val_m, test_m = logistic_baseline(norm, window_size=48)  
        tests.append((k, q, val_m["pr"], val_m["roc"], test_m["pr"], test_m["roc"]))


print("k  eps_q   VAL_PR   VAL_ROC   TEST_PR  TEST_ROC")
for k, q, vpr, vroc, tpr, troc in tests:
    print(f"{k:<2} {q:<5}  {vpr:.3f}    {vroc:.3f}     {tpr:.3f}    {troc:.3f}")


best = max(tests, key=lambda x: x[2])
print("\nBEST (by VAL_PR): k=%d, eps_q=%.2f  VAL_PR=%.3f  VAL_ROC=%.3f" % (best[0], best[1], best[2], best[3]))

6. Testing the model on test data set and comparing outputs