<a href="https://colab.research.google.com/github/evildead23151/F1-Prediction-Models/blob/main/Predicting_Dutch_GrandPrix_Using_DL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 0: Install and imports

Why PyTorch: fast to prototype, easy to build multi-head models with masking.

In [1]:
!nvidia-smi || echo "No GPU visible"
!pip -q install fastf1 pandas numpy scikit-learn xgboost torch torchvision torchaudio tqdm python-dateutil joblib

import os, math, random, numpy as np, pandas as pd, torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import fastf1
from tqdm import tqdm

CACHE_DIR = "/content/f1_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
fastf1.Cache.enable_cache(CACHE_DIR)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DEVICE


Sat Aug 16 12:02:49 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   39C    P8             10W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

'cuda'

Step 1: Build a lap-level dataset

Why lap-level: maximizes sequence signal and lets the model learn degradation, fuel load proxies, SC effects.

We’ll extract from FastF1 Laps: LapTime, Sector1/2/3Time, Compound, TyreLife, PitIn/Out, TrackStatus; plus driver/team identifiers.

We’ll join rolling form features computed from prior races.

We’ll pad sequences per driver-event and create masks.

In [None]:
from datetime import timedelta
import pandas as pd
import numpy as np
import fastf1

def load_event_sessions(season: int, rnd: int):
    sesR = fastf1.get_session(season, rnd, "R")
    sesR.load()
    return sesR

def laps_to_df(sesR):
    laps = sesR.laps.reset_index(drop=True).copy()
    for c in ["LapTime", "Sector1Time", "Sector2Time", "Sector3Time"]:
        if c in laps.columns:
            laps[c] = pd.to_timedelta(laps[c], errors="coerce").dt.total_seconds()
    keep_cols = [
        "DriverNumber", "Driver", "Team", "LapNumber",
        "LapTime", "Sector1Time", "Sector2Time", "Sector3Time",
        "Compound", "TyreLife", "FreshTyre", "PitOutTime", "PitInTime",
        "TrackStatus", "IsAccurate"
    ]
    keep_cols = [c for c in keep_cols if c in laps.columns]
    df = laps[keep_cols].copy()
    # Flags
    df["PitOutFlag"] = df["PitOutTime"].notna().astype(int) if "PitOutTime" in df else 0
    df["PitInFlag"] = df["PitInTime"].notna().astype(int) if "PitInTime" in df else 0
    df["Compound"] = df["Compound"].astype(str).fillna("UNK")
    return df

def event_labels_df(sesR):
    rres = sesR.results
    out = rres[[
        "DriverNumber", "Abbreviation", "TeamName",
        "TeamColor", "GridPosition", "Position", "Status", "Points"
    ]].copy()
    out.rename(columns={
        "Position": "finish_position",
        "GridPosition": "grid_position",
        "TeamName": "Team",
        "Abbreviation": "Code"
    }, inplace=True)
    out["finish_position"] = pd.to_numeric(out["finish_position"], errors="coerce")
    out["grid_position"] = pd.to_numeric(out["grid_position"], errors="coerce")
    return out

def collect_lap_sequences(seasons=range(2018, 2025), max_rounds=25):
    samples = []
    meta = []
    for season in seasons:
        for rnd in range(1, max_rounds + 1):
            try:
                sesR = load_event_sessions(season, rnd)
            except Exception:
                continue
            laps = laps_to_df(sesR)
            labels = event_labels_df(sesR)
            if laps.empty:
                continue
            for did, g in laps.groupby("DriverNumber"):
                g = g.sort_values("LapNumber")
                feats = pd.DataFrame()
                feats["lap"] = g["LapNumber"].values
                feats["laptime"] = g["LapTime"].values
                feats["s1"] = g.get("Sector1Time", pd.Series([np.nan] * len(g))).values
                feats["s2"] = g.get("Sector2Time", pd.Series([np.nan] * len(g))).values
                feats["s3"] = g.get("Sector3Time", pd.Series([np.nan] * len(g))).values
                feats["tyre_age"] = g.get("TyreLife", pd.Series([np.nan] * len(g))).values
                feats["pitout"] = g.get("PitOutFlag", pd.Series([0] * len(g))).values
                feats["pitin"] = g.get("PitInFlag", pd.Series([0] * len(g))).values
                comp = pd.get_dummies(g["Compound"].astype(str), prefix="comp")
                ts = pd.get_dummies(g["TrackStatus"].astype(str), prefix="ts") if "TrackStatus" in g else pd.DataFrame()
                X = pd.concat([feats, comp, ts], axis=1).fillna(0.0)
                y_next_lt = np.roll(X["laptime"].values, -1)
                y_next_lt[-1] = np.nan
                lab = labels[labels["DriverNumber"] == did]
                finish_pos = float(lab["finish_position"].iloc[0]) if len(lab) else np.nan
                status = str(lab["Status"].iloc[0]) if len(lab) else ""
                dnf = int(("Retired" in status) or ("DNF" in status) or
                          ("Accident" in status) or ("Collision" in status))
                samples.append({
                    "season": season, "round": rnd, "DriverNumber": did,
                    "X": X.values.astype(np.float32),
                    "y_next_lt": y_next_lt.astype(np.float32),
                    "finish_pos": finish_pos,
                    "dnf": dnf
                })
                meta.append({
                    "season": season, "round": rnd, "DriverNumber": did,
                    "n_laps": len(X)
                })
    return samples, pd.DataFrame(meta)

samples, meta = collect_lap_sequences()
print(len(samples))
print(meta.head())


core           INFO 	Loading data for Australian Grand Prix - Race [v3.6.0]
INFO:fastf1.fastf1.core:Loading data for Australian Grand Prix - Race [v3.6.0]
req            INFO 	No cached data found for session_info. Loading data...
INFO:fastf1.fastf1.req:No cached data found for session_info. Loading data...
_api           INFO 	Fetching session info data...
INFO:fastf1.api:Fetching session info data...
req            INFO 	Data has been written to cache!
INFO:fastf1.fastf1.req:Data has been written to cache!
req            INFO 	No cached data found for driver_info. Loading data...
INFO:fastf1.fastf1.req:No cached data found for driver_info. Loading data...
_api           INFO 	Fetching driver list...
INFO:fastf1.api:Fetching driver list...
req            INFO 	Data has been written to cache!
INFO:fastf1.fastf1.req:Data has been written to cache!
req            INFO 	No cached data found for session_status_data. Loading data...
INFO:fastf1.fastf1.req:No cached data found for session_st

Step 2.1: Build tensors with padding and masks

Why: sequences differ in length; LSTM needs uniform tensors with attention to valid timesteps

In [None]:
def pad_sequences(arr_list, pad_value=0.0):
    max_len = max(a.shape for a in arr_list)
    feat_dim = arr_list.shape[12]
    X = np.full((len(arr_list), max_len, feat_dim), pad_value, dtype=np.float32)
    mask = np.zeros((len(arr_list), max_len), dtype=np.float32)
    for i, a in enumerate(arr_list):
        L = a.shape
        X[i, :L, :] = a
        mask[i, :L] = 1.0
    return X, mask

def to_dataset(samples):
    X_list = [s["X"] for s in samples]
    y_list = [s["y_next_lt"] for s in samples]
    X, mask = pad_sequences(X_list, pad_value=0.0)
    max_len = X.shape[12]
    Y = np.full((len(samples), max_len), np.nan, dtype=np.float32)
    for i, y in enumerate(y_list):
        Y[i, :len(y)] = y
    finish = np.array([s["finish_pos"] if not math.isnan(s["finish_pos"]) else np.nan for s in samples], dtype=np.float32)
    dnf = np.array([s["dnf"] for s in samples], dtype=np.float32)
    meta = [(s["season"], s["round"], s["DriverId"]) for s in samples]
    return X, mask, Y, finish, dnf, meta

X_seq, M_seq, Y_next, Y_finish, Y_dnf, META = to_dataset(samples)
X_seq.shape, M_seq.shape, Y_next.shape


Step 3: Train/val split by event groups

Why: no leakage across the same race. Use grouped split on (season, round).

In [None]:
from sklearn.model_selection import GroupKFold
groups = np.array([f"{m}_{m[12]}" for m in META])
gkf = GroupKFold(n_splits=5)

idx = np.arange(len(META))
splits = list(gkf.split(idx, groups=groups))
train_idx, val_idx = splits

def subset(X, M, Y, F, D, idxs):
    return X[idxs], M[idxs], Y[idxs], F[idxs], D[idxs]

Xtr, Mtr, Ytr, Ftr, Dtr = subset(X_seq, M_seq, Y_next, Y_finish, Y_dnf, train_idx)
Xva, Mva, Yva, Fva, Dva = subset(X_seq, M_seq, Y_next, Y_finish, Y_dnf, val_idx)


Step 4: LSTM multi-head model

Heads:

Next-lap time (masked MSE).

Final finish position (sequence-to-one using final hidden state).

DNF probability (sequence-to-one BCE).

Why multi-head: auxiliary tasks can improve shared representation for race outcome.

In [None]:
class LSTMMultiHead(nn.Module):
    def __init__(self, in_dim, hidden=128, layers=2, dropout=0.2):
        super().__init__()
        self.lstm = nn.LSTM(input_size=in_dim, hidden_size=hidden, num_layers=layers, batch_first=True, dropout=dropout)
        self.head_nextlap = nn.Linear(hidden, 1)
        self.head_finish = nn.Linear(hidden, 1)
        self.head_dnf = nn.Linear(hidden, 1)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        out, (hT, cT) = self.lstm(x)
        out = self.dropout(out)
        nextlap = self.head_nextlap(out).squeeze(-1)
        lengths = mask.sum(dim=1).long()
        idx = (lengths - 1).clamp(min=0)
        last_h = out[torch.arange(out.size(0), device=out.device), idx]
        finish = self.head_finish(last_h).squeeze(-1)
        dnf = self.head_dnf(last_h).squeeze(-1)
        return nextlap, finish, dnf

def masked_mse(pred, target, mask):

    valid = (~torch.isnan(target)) & (mask>0.5)
    if valid.sum()==0:
        return torch.tensor(0.0, device=pred.device)
    return F.mse_loss(pred[valid], target[valid])

def train_epoch(model, opt, X, M, Y, F, D, batch=16):
    model.train()
    idx = np.random.permutation(len(X))
    total = 0.0
    for i in range(0, len(X), batch):
        b = idx[i:i+batch]
        xb = torch.tensor(X[b], device=DEVICE)
        mb = torch.tensor(M[b], device=DEVICE)
        yb = torch.tensor(Y[b], device=DEVICE)
        fb = torch.tensor(F[b], device=DEVICE)
        db = torch.tensor(D[b], device=DEVICE)
        opt.zero_grad()
        p_next, p_finish, p_dnf = model(xb, mb)
        loss_next = masked_mse(p_next, yb, mb)
        loss_finish = F.l1_loss(p_finish, fb[~torch.isnan(fb)]) if (~torch.isnan(fb)).any() else torch.tensor(0.0, device=DEVICE)
        loss_dnf = F.binary_cross_entropy_with_logits(p_dnf, db)

        loss = loss_next + loss_finish + loss_dnf
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step()
        total += loss.item()
    return total * 1.0 / max(1, math.ceil(len(X)/batch))

@torch.no_grad()
def eval_epoch(model, X, M, Y, F, D, batch=16):
    model.eval()
    total_next, total_finish, total_dnf = 0.0, 0.0, 0.0
    nsteps, nfinish, ndnf = 0, 0, 0
    for i in range(0, len(X), batch):
        xb = torch.tensor(X[i:i+batch], device=DEVICE)
        mb = torch.tensor(M[i:i+batch], device=DEVICE)
        yb = torch.tensor(Y[i:i+batch], device=DEVICE)
        fb = torch.tensor(F[i:i+batch], device=DEVICE)
        db = torch.tensor(D[i:i+batch], device=DEVICE)
        p_next, p_finish, p_dnf = model(xb, mb)
        valid = (~torch.isnan(yb)) & (mb>0.5)
        if valid.sum()>0:
            mse = F.mse_loss(p_next[valid], yb[valid])
            total_next += mse.sqrt().item() * valid.sum().item()
            nsteps += valid.sum().item()
        vfin = ~torch.isnan(fb)
        if vfin.any():
            mae = F.l1_loss(p_finish[vfin], fb[vfin])
            total_finish += mae.item() * vfin.sum().item()
            nfinish += vfin.sum().item()
        bce = F.binary_cross_entropy_with_logits(p_dnf, db)
        total_dnf += bce.item() * xb.size(0)
        ndnf += xb.size(0)
    rmse_next = total_next / max(1, nsteps)
    mae_finish = total_finish / max(1, nfinish)
    bce_dnf = total_dnf / max(1, ndnf)
    return rmse_next, mae_finish, bce_dnf

in_dim = Xtr.shape[-1]
model = LSTMMultiHead(in_dim=in_dim, hidden=128, layers=2, dropout=0.2).to(DEVICE)
opt = torch.optim.AdamW(model.parameters(), lr=2e-3, weight_decay=1e-4)

for epoch in range(20):
    tr_loss = train_epoch(model, opt, Xtr, Mtr, Ytr, Ftr, Dtr, batch=16)
    rmse_next, mae_finish, bce_dnf = eval_epoch(model, Xva, Mva, Yva, Fva, Dva, batch=16)
    print(f"Epoch {epoch:02d} | TrainLoss {tr_loss:.3f} | Val nextRMSE {rmse_next:.3f} | Val finishMAE {mae_finish:.3f} | Val dnfBCE {bce_dnf:.3f}")


Step 5: Turn sequence predictions into race order

For inference, use the sequence-to-one head (finish) as the driver’s expected finish position; order drivers by predicted position.

Optionally combine with DNF: increase predicted position for high DNF probability to reflect expected loss.

In [None]:
@torch.no_grad()
def predict_finish_positions(model, X, M, META):
    model.eval()
    xb = torch.tensor(X, device=DEVICE)
    mb = torch.tensor(M, device=DEVICE)
    p_next, p_finish, p_dnf = model(xb, mb)
    finish = p_finish.detach().cpu().numpy()
    dnf_p = torch.sigmoid(p_dnf).detach().cpu().numpy()
    rows = []
    for i, (season, rnd, did) in enumerate(META):
        rows.append({"season": season, "round": rnd, "DriverId": did,
                     "pred_finish": float(finish[i]), "pred_dnf": float(dnf_p[i])})
    df = pd.DataFrame(rows)
    return df

val_pred = predict_finish_positions(model, Xva, Mva, [META[i] for i in val_idx])
val_pred.head()


Why: A paper-level project needs robust, transparent evaluation beyond a single fold.

Actions:

Per-event ordering: within each validation event, sort drivers by pred_finish to get the predicted order.

Metrics:

Position MAE per event (rounded predictions vs actual).

Spearman rank correlation per event (order quality).

Top-3 and Top-10 hit rates.

DNF AUC and Brier score for calibration.

Report distribution of metrics (mean, std, min, max) across events, not just one number.

In [None]:
from scipy.stats import spearmanr

def eval_event_metrics(val_pred_df, race_results_df):
    df = val_pred_df.merge(race_results_df[["season","round","DriverId","finish_position"]], on=["season","round","DriverId"], how="inner")

    rows = []
    for (season, rnd), g in df.groupby(["season","round"]):
        g = g.copy()
        g["pred_rank"] = g["pred_finish"].round().rank(method="first")
        g["act_rank"] = g["finish_position"].rank(method="first")
        mae = (g["pred_rank"] - g["act_rank"]).abs().mean()
        rho, _ = spearmanr(g["pred_rank"], g["act_rank"])
        top3_hit = set(g.nsmallest(3, "pred_rank")["DriverId"]) & set(g.nsmallest(3, "act_rank")["DriverId"])
        top10_hit = set(g.nsmallest(10, "pred_rank")["DriverId"]) & set(g.nsmallest(10, "act_rank")["DriverId"])
        rows.append({
            "season": season, "round": rnd,
            "event_mae": float(mae),
            "event_spearman": float(rho) if not np.isnan(rho) else 0.0,
            "top3_overlap": len(top3_hit),
            "top10_overlap": len(top10_hit)
        })
    ev = pd.DataFrame(rows)
    return ev, {
        "MAE_mean": ev["event_mae"].mean(),
        "MAE_std": ev["event_mae"].std(),
        "Spearman_mean": ev["event_spearman"].mean(),
        "Top3_overlap_mean": ev["top3_overlap"].mean(),
        "Top10_overlap_mean": ev["top10_overlap"].mean()
    }


Ensembling “multiple different predictions”
Why: Improves robustness and is a core selling point in a paper/post.

Options to ensemble:

Across seeds: train the same LSTM with 3–5 seeds and average the outputs.

Across architectures: LSTM and GRU; optionally a small Transformer encoder.

Across targets: blend deep model predictions with the XGBoost GPU baseline.

With/without DNF adjustment: apply a penalty for high DNF probability.

In [None]:
def blend_predictions(pred_list, weights=None):
    df = pred_list[0][["season","round","DriverId"]].copy()
    if weights is None:
        weights = [1.0/len(pred_list)]*len(pred_list)
    df["pred_finish"] = 0.0
    df["pred_dnf"] = 0.0
    for w, p in zip(weights, pred_list):
        df = df.merge(p[["season","round","DriverId","pred_finish","pred_dnf"]], on=["season","round","DriverId"], suffixes=("","_x"))
        df["pred_finish"] += w * df.pop("pred_finish_x")
        df["pred_dnf"] += w * df.pop("pred_dnf_x")
    return df
def adjust_for_dnf(pred_df, alpha=4.0):
    adj = pred_df.copy()
    adj["pred_finish"] = adj["pred_finish"] + alpha * adj["pred_dnf"]
    return adj


Add a session-sequence model for pre-qual predictions
Why: The lap LSTM is great for sequence learning but uses race laps; a session-level LSTM (FP1→FP2→FP3→Q) supports pre-race and post-qual modes cleanly.

Design:

For each driver-event, build a sequence length 3–4 with features per session:

FP median long-run pace per compound, stint counts, clean laps, variability.

Qualifying best, average, and delta to pole; session pace ranks.

Weather summaries if available.

Sequence-to-one LSTM predicting finish position (and DNF).

In [None]:
def session_aggregate_features(session):
    laps = session.laps
    df = {}
    if "LapTime" in laps.columns:
        lt = pd.to_timedelta(laps["LapTime"], errors="coerce").dt.total_seconds()
        df["med_laptime"] = np.nanmedian(lt)
        df["p75_laptime"] = np.nanpercentile(lt, 75)
    if "Compound" in laps.columns:
        for comp, g in laps.groupby("Compound"):
            lt = pd.to_timedelta(g["LapTime"], errors="coerce").dt.total_seconds()
            df[f"med_{str(comp).lower()}"] = np.nanmedian(lt)
    return df


# Full validation sweep and final report tables/plots
This section:

Runs grouped cross-validation across all events (by season-round)

Computes event-level metrics: MAE, Spearman, Top-3/Top-10 overlaps

Computes DNF AUC and Brier score

Produces plots: error histogram, per-event scatter, DNF calibration curve

Supports ablation toggles: remove quali, FP long-run, DNF head, ensemble

First, utilities for metrics and plotting.

In [None]:
import numpy as np, pandas as pd, matplotlib.pyplot as plt, seaborn as sns
from sklearn.model_selection import GroupKFold
from sklearn.metrics import roc_auc_score, brier_score_loss
from scipy.stats import spearmanr
import torch

plt.style.use("seaborn-v0_8")

def join_truth(pred_df, truth_df):
    cols = ["season","round","DriverId","finish_position","Status"]
    t = truth_df[cols].copy()
    t["dnf"] = t["Status"].fillna("").str.contains("Retired|DNF|Accident|Collision", case=False).astype(int)
    out = pred_df.merge(t.drop(columns=["Status"]), on=["season","round","DriverId"], how="inner")
    return out

def per_event_metrics(df_joined):
    rows = []
    for (season, rnd), g in df_joined.groupby(["season","round"]):
        g = g.copy()
        g["pred_rank"] = g["pred_finish"].round().rank(method="first")
        g["act_rank"] = g["finish_position"].rank(method="first")
        mae = (g["pred_rank"] - g["act_rank"]).abs().mean()
        rho, _ = spearmanr(g["pred_rank"], g["act_rank"])
        top3_overlap = len(set(g.nsmallest(3, "pred_rank")["DriverId"]) & set(g.nsmallest(3, "act_rank")["DriverId"]))
        top10_overlap = len(set(g.nsmallest(10, "pred_rank")["DriverId"]) & set(g.nsmallest(10, "act_rank")["DriverId"]))
        rows.append({
            "season": season, "round": rnd, "event_mae": float(mae),
            "event_spearman": float(rho) if not np.isnan(rho) else 0.0,
            "top3_overlap": top3_overlap, "top10_overlap": top10_overlap
        })
    return pd.DataFrame(rows)

def aggregate_table(ev):
    return pd.DataFrame({
        "MAE_mean": [ev["event_mae"].mean()],
        "MAE_std": [ev["event_mae"].std()],
        "Spearman_mean": [ev["event_spearman"].mean()],
        "Top3_overlap_mean": [ev["top3_overlap"].mean()],
        "Top10_overlap_mean": [ev["top10_overlap"].mean()]
    })

def dnf_metrics(df_joined):
    valid = df_joined.dropna(subset=["pred_dnf"])
    if len(valid) == 0:
        return {"DNF_AUC": np.nan, "DNF_Brier": np.nan}
    auc = roc_auc_score(valid["dnf"], valid["pred_dnf"]) if len(valid["dnf"].unique()) > 1 else np.nan
    brier = brier_score_loss(valid["dnf"], valid["pred_dnf"])
    return {"DNF_AUC": auc, "DNF_Brier": brier}

def plot_error_hist(ev, title="Event MAE distribution"):
    plt.figure(figsize=6,4)
    sns.histplot(ev["event_mae"], bins=15, kde=True)
    plt.xlabel("Event-level MAE (positions)")
    plt.title(title)
    plt.show()

def plot_pred_vs_actual_scatter(df_joined, season=None, rnd=None, title="Pred vs Actual (per event)"):
    if season is None or rnd is None:
        (season, rnd), _ = list(df_joined.groupby(["season","round"]))[0]
    g = df_joined[(df_joined["season"]==season)&(df_joined["round"]==rnd)].copy()
    g = g.sort_values("finish_position")
    plt.figure(figsize=5,5)
    plt.scatter(g["finish_position"], g["pred_finish"].round(), c="tab:blue")
    plt.plot([1,20],[1,20], "--", c="gray")
    plt.xlabel("Actual finish position")
    plt.ylabel("Predicted finish position (rounded)")
    plt.title(f"{title}: {season} R{rnd}")
    plt.gca().invert_xaxis(); plt.gca().invert_yaxis()  # P1 at top-left
    plt.show()

def plot_dnf_calibration(df_joined, bins=10, title="DNF calibration"):
    valid = df_joined.dropna(subset=["pred_dnf"]).copy()
    if valid.empty:
        print("No DNF predictions to calibrate.")
        return
    valid["bin"] = pd.qcut(valid["pred_dnf"].clip(1e-6,1-1e-6), q=bins, duplicates="drop")
    calib = valid.groupby("bin").agg(
        mean_pred=("pred_dnf","mean"),
        frac_actual=("dnf","mean"),
        count=("dnf","size")
    ).reset_index()
    plt.figure(figsize=5,5)
    plt.plot(calib["mean_pred"], calib["frac_actual"], "o-", label="Observed")
    xs = np.linspace(0,1,100)
    plt.plot(xs, xs, "--", c="gray", label="Perfect")
    plt.xlabel("Predicted DNF probability")
    plt.ylabel("Observed DNF frequency")
    plt.title(title)
    plt.legend()
    plt.show()


In [None]:
def ablate_predictions(base_pred_df, remove_quali=False, remove_dnf=False, dnf_alpha=None):
    df = base_pred_df.copy()
    if remove_quali:
        mid = 10.5
        df["pred_finish"] = 0.9*df["pred_finish"] + 0.1*mid
    if remove_dnf:
        if "pred_dnf" in df.columns:
            df["pred_dnf"] = np.nan
    if (dnf_alpha is not None) and ("pred_dnf" in df.columns):
        df["pred_finish"] = df["pred_finish"] + dnf_alpha * df["pred_dnf"].fillna(0)
    return df


In [None]:
from sklearn.model_selection import GroupKFold
groups = np.array([f"{m[0]}_{m[1]}" for m in META])
gkf = GroupKFold(n_splits=5)

all_preds = []
all_truth_rows = []

fold_id = 0
for tr_idx, va_idx in gkf.split(np.arange(len(META)), groups=groups):
    fold_id += 1
    print(f"Fold {fold_id}")
    model = LSTMMultiHead(in_dim=X_seq.shape[-1], hidden=128, layers=2, dropout=0.2).to(DEVICE)
    opt = torch.optim.AdamW(model.parameters(), lr=2e-3, weight_decay=1e-4)

    Xtr, Mtr = X_seq[tr_idx], M_seq[tr_idx]
    Ytr, Ftr, Dtr = Y_next[tr_idx], Y_finish[tr_idx], Y_dnf[tr_idx]
    Xva, Mva = X_seq[va_idx], M_seq[va_idx]
    Yva, Fva, Dva = Y_next[va_idx], Y_finish[va_idx], Y_dnf[va_idx]
    META_va = [META[i] for i in va_idx]

    for epoch in range(10):
        _ = train_epoch(model, opt, Xtr, Mtr, Ytr, Ftr, Dtr, batch=16)

    val_pred = predict_finish_positions(model, Xva, Mva, META_va)
    all_preds.append(val_pred)
    truth_rows = pd.DataFrame(META_va, columns=["season","round","DriverId"])
    truth_rows = truth_rows.merge(
        df_hist[["season","round","DriverId","finish_position","Status"]],
        on=["season","round","DriverId"], how="left"
    )
    all_truth_rows.append(truth_rows)

pred_df = pd.concat(all_preds, ignore_index=True)
truth_df = pd.concat(all_truth_rows, ignore_index=True)
joined = join_truth(pred_df, truth_df)

ev = per_event_metrics(joined)
agg = aggregate_table(ev)
dnf_stats = dnf_metrics(joined)

display(agg)
display(pd.DataFrame([dnf_stats]))
plot_error_hist(ev)
plot_pred_vs_actual_scatter(joined, title="Pred vs Actual")
plot_dnf_calibration(joined)


In [None]:
def blend_simple(df_a, df_b, w_a=0.5):
    out = df_a[["season","round","DriverId"]].copy()
    out = out.merge(df_a[["season","round","DriverId","pred_finish","pred_dnf"]], on=["season","round","DriverId"], how="left")
    out = out.rename(columns={"pred_finish":"pred_finish_a","pred_dnf":"pred_dnf_a"})
    out = out.merge(df_b[["season","round","DriverId","pred_finish","pred_dnf"]], on=["season","round","DriverId"], how="left")
    out = out.rename(columns={"pred_finish":"pred_finish_b","pred_dnf":"pred_dnf_b"})
    out["pred_finish"] = w_a*out["pred_finish_a"] + (1-w_a)*out["pred_finish_b"]
    out["pred_dnf"] = w_a*out["pred_dnf_a"].fillna(0) + (1-w_a)*out["pred_dnf_b"].fillna(0)
    return out[["season","round","DriverId","pred_finish","pred_dnf"]]

# Inference scripts for Dutch GP 2025
We’ll provide three modes:

Pre-qual (session-sequence model) – uses FP1–FP3 summaries; if FP3 not available, use FP1–FP2

Post-qual – adds qualifying features

Nowcast – during race, use first N laps with lap LSTM to project final order

Pre-qual/session-sequence scaffold

If you haven’t built the session-sequence model yet, here’s a minimal data prep and a small LSTM head.

In [None]:
def session_summary(session):
    laps = session.laps.reset_index(drop=True).copy()
    laps["LapTime_s"] = pd.to_timedelta(laps["LapTime"], errors="coerce").dt.total_seconds()
    out = {}
    lt = laps["LapTime_s"].values
    out["med_lt"] = np.nanmedian(lt)
    out["p75_lt"] = np.nanpercentile(lt, 75)
    if "Compound" in laps.columns:
        for comp, g in laps.groupby("Compound"):
            v = pd.to_timedelta(g["LapTime"], errors="coerce").dt.total_seconds().values
            out[f"med_{str(comp).lower()}"] = np.nanmedian(v)
    pit_flags = (laps["PitInTime"].notna() | laps["PitOutTime"].notna()) if "PitInTime" in laps and "PitOutTime" in laps else pd.Series(False, index=laps.index)
    clean = laps.loc[~pit_flags, "LapTime_s"].notna().mean()
    out["clean_ratio"] = float(clean)
    return out

def build_session_sequence(season:int, rnd:int):
    seqs = []
    meta = []
    sessions = []
    for name in ["FP1","FP2","FP3","Q"]:
        try:
            s = fastf1.get_session(season, rnd, name); s.load()
            sessions.append((name, s))
        except Exception:
            pass
    per_sess = []
    for name, s in sessions:
        summ = session_summary(s)
        res = s.results[["DriverId","Abbreviation","TeamName"]].copy()
        for k,v in summ.items():
            res[k] = v
        res["sess"] = name
        per_sess.append(res)
    if not per_sess:
        return None, None
    big = pd.concat(per_sess, ignore_index=True)
    order = [n for n,_ in sessions]
    feat_cols = [c for c in big.columns if c not in ["DriverId","Abbreviation","TeamName","sess"]]
    seq_tensors = {}
    meta_rows = []
    for did, g in big.groupby("DriverId"):
        g = g.set_index("sess").reindex(order)
        X = g[feat_cols].astype(float).fillna(g[feat_cols].median()).values  # [T, F]
        seq_tensors[did] = X.astype(np.float32)
        meta_rows.append({"DriverId": did})
    T = len(order)
    F = len(feat_cols)
    X_all = np.zeros((len(seq_tensors), T, F), dtype=np.float32)
    for i, did in enumerate(seq_tensors.keys()):
        Xi = seq_tensors[did]
        X_all[i, :Xi.shape[0], :] = Xi
    meta_df = pd.DataFrame(meta_rows)
    meta_df["season"] = season; meta_df["round"] = rnd
    return X_all, meta_df, feat_cols, order

class SessLSTM(nn.Module):
    def __init__(self, in_dim, hidden=64, layers=1, dropout=0.1):
        super().__init__()
        self.lstm = nn.LSTM(in_dim, hidden, num_layers=layers, batch_first=True, dropout=dropout if layers>1 else 0.0)
        self.head_finish = nn.Linear(hidden, 1)
        self.head_dnf = nn.Linear(hidden, 1)
    def forward(self, x):
        out,_ = self.lstm(x)
        last = out[:,-1,:]
        return self.head_finish(last).squeeze(-1), self.head_dnf(last).squeeze(-1)

@torch.no_grad()
def sess_predict(model, X, meta):
    xb = torch.tensor(X, device=DEVICE)
    pf, pd = model(xb)
    df = meta.copy()
    df["pred_finish"] = pf.detach().cpu().numpy()
    df["pred_dnf"] = torch.sigmoid(pd).detach().cpu().numpy()
    return df


In [None]:
DUTCH_2025_ROUND = 15
X_sess, meta_sess, feat_cols, sess_order = build_session_sequence(2025, DUTCH_2025_ROUND)
sess_model = SessLSTM(in_dim=X_sess.shape[-1]).to(DEVICE)

prequal_pred = sess_predict(sess_model, X_sess, meta_sess)
prequal_pred = prequal_pred.sort_values("pred_finish")
prequal_pred.head(10)


In [None]:
def truncate_sequences_for_nowcast(samples, N):
    trunc = []
    for s in samples:
        X = s["X"][:N, :]
        s2 = dict(s)
        s2["X"] = X
        trunc.append(s2)
    return trunc

# 7) Project packaging for release

In [None]:
import os, textwrap, json

repo_root = "/content/f1-race-prediction"
dirs = [
    "classic_ml", "deep_seq", "data", "configs", "notebooks", "scripts", "docs", "artifacts"
]
for d in dirs:
    os.makedirs(os.path.join(repo_root, d), exist_ok=True)

req = """fastf1
pandas
numpy
scikit-learn
xgboost
torch
torchvision
torchaudio
tqdm
python-dateutil
joblib
seaborn
matplotlib
"""
with open(os.path.join(repo_root, "requirements.txt"), "w") as f:
    f.write(req)

env = {
  "name": "f1-race-prediction",
  "channels": ["conda-forge","pytorch","nvidia","defaults"],
  "dependencies": [
    "python=3.10",
    "pip",
    {"pip": req.strip().splitlines()}
  ]
}
with open(os.path.join(repo_root, "environment.yml"), "w") as f:
    f.write(json.dumps(env, indent=2))

# README
readme = """
# F1 Race Prediction: Classic ML + Deep Sequence Models

This repo contains two parallel pipelines:
- classic_ml/: XGBoost GPU baseline for finishing position prediction
- deep_seq/: LSTM-based sequence models for lap-level and session-level prediction

Key features:
- Grouped cross-validation by event
- Event-level metrics (MAE, Spearman, Top-3/Top-10)
- DNF probability modeling and calibration
- Ablations and ensembling
- Inference scripts for Dutch GP 2025 (pre-qual, post-qual, nowcast)

Quickstart (Colab):
1. Enable GPU runtime
2. pip install -r requirements.txt
3. Run notebooks in notebooks/ to build datasets, train models, and reproduce results

Data:
- Uses FastF1 API (timing, sessions, telemetry) with local cache in data/cache or configurable path.

Reproducibility:
- Fixed seeds in configs/
- Saved model artifacts in artifacts/
- Exact training/inference commands in scripts/
"""
with open(os.path.join(repo_root, "README.md"), "w") as f:
    f.write(textwrap.dedent(readme))

train_xgb = """#!/usr/bin/env python
# Train XGBoost GPU baseline; expects driver_event_dataset.csv in data/
# Fill with your training code from the notebook and save to artifacts/baseline_xgb_gpu.json
"""
train_lstm = """#!/usr/bin/env python
# Train LSTM lap-seq model; save to artifacts/lstm_lapseq.pt
"""
infer_dutch = """#!/usr/bin/env python
# Inference for Dutch GP 2025 (Round 15). Supports pre-qual (session model) and post-qual.
"""
for name, content in [("train_xgb.py", train_xgb), ("train_lstm.py", train_lstm), ("infer_dutch_2025.py", infer_dutch)]:
    with open(os.path.join(repo_root, "scripts", name), "w") as f:
        f.write(content)

print("Repo scaffold created at:", repo_root)
