In [82]:
from pathlib import Path
import pandas as pd
import numpy as np


# Safety car generative model
Estimate SC/VSC start hazards (per circuit/year/phase) and continuation/transition probabilities by stint length; simulate laps with VSCâ†’SC transitions grounded in data.


In [83]:

import numpy as np
import pandas as pd
from pathlib import Path


def _phase(progress):
    if progress < 0.33:
        return "early"
    if progress < 0.66:
        return "middle"
    return "late"


class SafetyCarGenerativeModel:
    """Data-driven SC/VSC hazard + transition model with a next_state entrypoint."""

    def __init__(
        self,
        df=None,
        include_year=True,
        min_green_laps_prior=500.0,
        start_prior=1.0,
        trans_prior=1.0,
        max_len_bucket=12,
        rng=None,
    ):
        self.include_year = include_year
        self.min_green_laps_prior = float(min_green_laps_prior)
        self.start_prior = float(start_prior)
        self.trans_prior = float(trans_prior)
        self.max_len_bucket = int(max_len_bucket)
        self.rng = rng or np.random.default_rng()

        if df is None:
            df = self._load_default_df()
        self.laps = self._prepare(df)

        (
            self.vsc_hazard,
            self.sc_hazard,
            self.global_vsc_hazard,
            self.global_sc_hazard,
        ) = self._fit_start_hazards()

        self.vsc_trans = self._fit_transitions(flag_col="virtual_sc_this_lap", to_sc=True)
        self.sc_trans = self._fit_transitions(flag_col="safety_car_this_lap", to_sc=False)
        self._vsc_max_len = max(self.vsc_trans.keys(), default=0)
        self._sc_max_len = max(self.sc_trans.keys(), default=0)

    def _load_default_df(self):
        candidates = [
            Path("fastf1_lap_dataset.csv"),
            Path("models/fastf1_lap_dataset.csv"),
            Path("driver_lap_dataset.csv"),
            Path("models/driver_lap_dataset.csv"),
        ]
        csv_path = next((p for p in candidates if p.exists()), None)
        if csv_path is None:
            raise FileNotFoundError("No lap dataset found (fastf1_lap_dataset.csv / driver_lap_dataset.csv).")
        df = pd.read_csv(csv_path)
        required = ["safety_car_this_lap", "virtual_sc_this_lap", "total_race_laps", "lap_number", "circuit_id", "year", "session_key"]
        missing = [c for c in required if c not in df.columns]
        if missing:
            raise ValueError(f"Missing required columns: {missing}")
        return df

    def _prepare(self, df: pd.DataFrame) -> pd.DataFrame:
        laps = df.copy()
        laps = laps.sort_values(['session_key', 'lap_number'])
        laps['lap_number'] = pd.to_numeric(laps['lap_number'], errors='coerce').astype('Int64')
        laps['total_race_laps'] = pd.to_numeric(laps['total_race_laps'], errors='coerce').fillna(1)
        laps['race_progress'] = laps['lap_number'] / laps['total_race_laps'].replace(0, np.nan).fillna(1.0)
        laps['phase'] = laps['race_progress'].apply(_phase)
        laps['safety_car_this_lap'] = laps['safety_car_this_lap'].astype(bool)
        laps['virtual_sc_this_lap'] = laps['virtual_sc_this_lap'].astype(bool)
        return laps

    def _hazard_per_group(self, starts: pd.Series, exposure: pd.Series):
        prior_laps = self.min_green_laps_prior
        prior_events = self.start_prior
        hazard = (starts + prior_events) / (exposure + prior_laps)
        return hazard

    def _fit_start_hazards(self):
        laps = self.laps
        green_mask = ~(laps['safety_car_this_lap'] | laps['virtual_sc_this_lap'])
        green_laps = laps[green_mask]

        next_vsc = laps.groupby('session_key')['virtual_sc_this_lap'].shift(-1).fillna(False)
        next_sc = laps.groupby('session_key')['safety_car_this_lap'].shift(-1).fillna(False)

        def group_hazard(next_series: pd.Series):
            starts = next_series.groupby([green_laps['circuit_id'], green_laps['phase']]).sum()
            exposure = green_laps.groupby(['circuit_id', 'phase'])['lap_number'].count()
            hazard = self._hazard_per_group(starts, exposure)
            hazard_map = hazard.to_dict()
            global_h = float((starts.sum() + self.start_prior) / (exposure.sum() + self.min_green_laps_prior)) if exposure.sum() > 0 else 0.0

            hazard_year = None
            if self.include_year:
                starts_y = next_series.groupby([green_laps['year'], green_laps['phase']]).sum()
                exposure_y = green_laps.groupby(['year', 'phase'])['lap_number'].count()
                hazard_y = self._hazard_per_group(starts_y, exposure_y)
                hazard_year = hazard_y.to_dict()
            return hazard_map, hazard_year, global_h

        vsc_hazard, vsc_hazard_year, global_vsc_h = group_hazard(next_vsc[green_mask])
        sc_hazard, sc_hazard_year, global_sc_h = group_hazard(next_sc[green_mask])

        return (
            {'by_circuit_phase': vsc_hazard, 'by_year_phase': vsc_hazard_year},
            {'by_circuit_phase': sc_hazard, 'by_year_phase': sc_hazard_year},
            global_vsc_h,
            global_sc_h,
        )

    def _fit_transitions(self, flag_col: str, to_sc: bool):
        laps = self.laps
        sc_flags = laps['safety_car_this_lap'].astype(bool)

        stints = []
        for sess, grp in laps.groupby('session_key'):
            mask = grp[flag_col].astype(bool).to_numpy()
            idxs = grp.index.to_list()
            i = 0
            while i < len(mask):
                if mask[i]:
                    j = i
                    while j < len(mask) and mask[j]:
                        j += 1
                    stints.append((sess, idxs[i], idxs[j - 1], j - i))
                    i = j
                else:
                    i += 1

        if not stints:
            return {}

        max_len = min(self.max_len_bucket, max(s[3] for s in stints))
        trans = {}
        for l in range(1, max_len + 1):
            eligible = [s for s in stints if s[3] >= l]
            if not eligible:
                trans[l] = {'p_continue': float('nan'), 'p_end_to_sc': float('nan'), 'p_end_to_green': float('nan')}
                continue
            cont_count = sum(1 for s in eligible if s[3] >= l + 1)
            p_cont = (cont_count + self.trans_prior) / (len(eligible) + 2 * self.trans_prior)
            end_count = len(eligible) - cont_count
            if to_sc:
                end_sc = 0
                for sess, start_idx, end_idx, length in eligible:
                    if length == l:
                        pos = laps.index.get_loc(end_idx) + 1
                        if pos < len(laps.index):
                            next_idx = laps.index[pos]
                            if laps.at[end_idx, 'session_key'] == laps.at[next_idx, 'session_key'] and bool(sc_flags.loc[next_idx]):
                                end_sc += 1
                ratio_end_sc = (end_sc + self.trans_prior) / (end_count + 2 * self.trans_prior) if end_count > 0 else 0.0
                p_end = max(0.0, 1.0 - p_cont)
                p_end_to_sc = p_end * ratio_end_sc
                p_end_to_green = p_end * (1.0 - ratio_end_sc)
            else:
                p_end_to_sc = 0.0
                p_end_to_green = max(0.0, 1.0 - p_cont)
            trans[l] = {
                'p_continue': float(p_cont),
                'p_end_to_sc': float(p_end_to_sc),
                'p_end_to_green': float(p_end_to_green),
            }
        return trans

    def _hazard_lookup(self, hazard_dict, circuit_id, year, phase, global_h):
        h = hazard_dict['by_circuit_phase'].get((circuit_id, phase))
        if h is None:
            h = global_h
        if year is not None and hazard_dict.get('by_year_phase') is not None:
            h_year = hazard_dict['by_year_phase'].get((year, phase))
            if h_year is not None:
                h = 0.5 * float(h) + 0.5 * float(h_year)
        return float(h)

    def start_hazards(self, circuit_id: str, year: int | None, progress: float):
        phase = _phase(progress)
        vsc_h = self._hazard_lookup(self.vsc_hazard, circuit_id, year, phase, self.global_vsc_hazard)
        sc_h = self._hazard_lookup(self.sc_hazard, circuit_id, year, phase, self.global_sc_hazard)
        return vsc_h, sc_h

    def sample_start(self, circuit_id: str, year: int | None, progress: float, rng=None):
        if rng is None:
            rng = self.rng or np.random.default_rng()
        vsc_h, sc_h = self.start_hazards(circuit_id, year, progress)
        start_vsc = rng.random() < vsc_h
        start_sc = (not start_vsc) and (rng.random() < sc_h)
        return start_vsc, start_sc

    def _trans_for(self, state: str, stint_len: int):
        if state == 'vsc' and self.vsc_trans:
            bucket = min(stint_len, self._vsc_max_len)
            return self.vsc_trans.get(bucket, self.vsc_trans.get(self._vsc_max_len, {}))
        if state == 'sc' and self.sc_trans:
            bucket = min(stint_len, self._sc_max_len)
            return self.sc_trans.get(bucket, self.sc_trans.get(self._sc_max_len, {}))
        return {}

    def next_state(self, state: str, stint_len: int, circuit_id: str, year: int | None, progress: float, rng=None):
        """
        Compute next state given current state, stint length, and context.
        state: 'green', 'vsc', or 'sc'
        stint_len: current stint length if in VSC/SC, else 0
        returns (next_state, next_stint_len)
        """
        if rng is None:
            rng = self.rng or np.random.default_rng()

        if state == 'green':
            start_vsc, start_sc = self.sample_start(circuit_id, year, progress, rng)
            if start_vsc:
                return 'vsc', 1
            if start_sc:
                return 'sc', 1
            return 'green', 0

        if state in ('vsc', 'sc'):
            trans = self._trans_for(state, stint_len)
            p_cont = trans.get('p_continue', 0.0)
            p_end_sc = trans.get('p_end_to_sc', 0.0)
            p_end_green = trans.get('p_end_to_green', 0.0)
            total = p_cont + p_end_sc + p_end_green
            if total <= 0:
                return 'green', 0
            p_cont /= total
            p_end_sc /= total
            r = rng.random()
            if r < p_cont:
                return state, stint_len + 1
            r -= p_cont
            if state == 'vsc' and r < p_end_sc:
                return 'sc', 1
            return 'green', 0

        return 'green', 0


In [84]:
sc_gen_model = SafetyCarGenerativeModel(include_year=True)
print(sc_gen_model.start_hazards('monza', 2023, 0.5))


  next_vsc = laps.groupby('session_key')['virtual_sc_this_lap'].shift(-1).fillna(False)
  next_sc = laps.groupby('session_key')['safety_car_this_lap'].shift(-1).fillna(False)


(0.0032200087464748315, 0.007700104698823269)


In [85]:

import numpy as np
import pandas as pd

laps = sc_gen_model.laps
sc_flags = laps['safety_car_this_lap'].astype(bool)
vsc_flags = laps['virtual_sc_this_lap'].astype(bool)

sc_stints = []
vsc_stints = []
for sess, grp in laps.groupby('session_key'):
    mask_sc = grp['safety_car_this_lap'].astype(bool).to_numpy()
    mask_vsc = grp['virtual_sc_this_lap'].astype(bool).to_numpy()
    idxs = grp.index.to_list()
    i = 0
    while i < len(mask_sc):
        if mask_sc[i]:
            j = i
            while j < len(mask_sc) and mask_sc[j]:
                j += 1
            sc_stints.append((idxs[i], idxs[j-1], j-i, sess))
            i = j
        else:
            i += 1
    i = 0
    while i < len(mask_vsc):
        if mask_vsc[i]:
            j = i
            while j < len(mask_vsc) and mask_vsc[j]:
                j += 1
            vsc_stints.append((idxs[i], idxs[j-1], j-i, sess))
            i = j
        else:
            i += 1

num_sc = len(sc_stints)
num_vsc = len(vsc_stints)
avg_sc = float(np.mean([s[2] for s in sc_stints])) if sc_stints else float('nan')
avg_vsc = float(np.mean([s[2] for s in vsc_stints])) if vsc_stints else float('nan')
sc_stint_pct = num_sc / len(laps) * 100
vsc_stint_pct = num_vsc / len(laps) * 100

sc_set = set(sc_flags[sc_flags].index)
vsc_to_sc = 0
vsc_to_sc_lengths = []
for start_idx, end_idx, length, sess in vsc_stints:
    pos = laps.index.get_loc(end_idx) + 1
    if pos < len(laps.index):
        next_idx = laps.index[pos]
        if laps.at[start_idx, 'session_key'] == laps.at[next_idx, 'session_key'] and next_idx in sc_set:
            vsc_to_sc += 1
            vsc_to_sc_lengths.append(length)

avg_vsc_len_to_sc = float(np.mean(vsc_to_sc_lengths)) if vsc_to_sc_lengths else float('nan')

vsc_to_sc_pct = (vsc_to_sc * 100 / num_vsc) if num_vsc else float('nan')

vsc_set = set(vsc_flags[vsc_flags].index)
sc_after_vsc_lengths = []
for start_idx, end_idx, length, sess in sc_stints:
    pos = laps.index.get_loc(start_idx) - 1
    if pos >= 0:
        prev_idx = laps.index[pos]
        if laps.at[start_idx, 'session_key'] == laps.at[prev_idx, 'session_key'] and prev_idx in vsc_set:
            sc_after_vsc_lengths.append(length)

avg_sc_after_vsc = float(np.mean(sc_after_vsc_lengths)) if sc_after_vsc_lengths else float('nan')

print(f"Number of SC stints: {num_sc} - {sc_stint_pct:.3f}%")
print(f"Number of VSC stints: {num_vsc} - {vsc_stint_pct:.3f}%")
print(f"Average SC stint length (laps): {avg_sc:.2f}")
print(f"Average VSC stint length (laps): {avg_vsc:.2f}")
print(f"VSC stints transitioning to SC next lap: {vsc_to_sc}")
print(f"VSC stints transitioning to SC next lap (% of VSC stints): {vsc_to_sc_pct:.3f}")
print(f"Average VSC stint length when followed by SC: {avg_vsc_len_to_sc:.2f}")
print(f"Average SC stint length when preceded by VSC: {avg_sc_after_vsc:.2f}")


Number of SC stints: 1297 - 0.653%
Number of VSC stints: 774 - 0.390%
Average SC stint length (laps): 4.33
Average VSC stint length (laps): 3.80
VSC stints transitioning to SC next lap: 34
VSC stints transitioning to SC next lap (% of VSC stints): 4.393
Average VSC stint length when followed by SC: 4.88
Average SC stint length when preceded by VSC: nan


In [87]:

import numpy as np
import pandas as pd

rng = np.random.default_rng(42)

laps = sc_gen_model.laps
circuits = sorted(laps['circuit_id'].dropna().unique().tolist())
years = list(range(2018, 2026))

num_races = 20000
race_length = 100
sim_flags = []  # list of per-race lap states

for _ in range(num_races):
    year = rng.choice(years)
    circuit_id = rng.choice(circuits)
    state = 'green'
    stint_len = 0
    race_flags = []
    for lap in range(1, race_length + 1):
        race_flags.append(state)
        progress = lap / race_length
        state, stint_len = sc_gen_model.next_state(state, stint_len, circuit_id, year, progress, rng)
    sim_flags.append(race_flags)

# Convert lap flags into stints and metrics
sc_stints = []
vsc_stints = []
vsc_to_sc = 0
vsc_len_to_sc = []
sc_after_vsc_lengths = []

for race_flags in sim_flags:
    i = 0
    n = len(race_flags)
    while i < n:
        flag = race_flags[i]
        if flag == 'sc':
            j = i
            while j < n and race_flags[j] == 'sc':
                j += 1
            length = j - i
            sc_stints.append(length)
            if i > 0 and race_flags[i - 1] == 'vsc':
                sc_after_vsc_lengths.append(length)
            i = j
        elif flag == 'vsc':
            j = i
            while j < n and race_flags[j] == 'vsc':
                j += 1
            length = j - i
            vsc_stints.append(length)
            if j < n and race_flags[j] == 'sc':
                vsc_to_sc += 1
                vsc_len_to_sc.append(length)
            i = j
        else:
            i += 1

num_sc = len(sc_stints)
num_vsc = len(vsc_stints)

total_laps = num_races * race_length
sc_stint_pct = num_sc / total_laps * 100
vsc_stint_pct = num_vsc / total_laps * 100
avg_sc = float(np.mean(sc_stints)) if sc_stints else float('nan')
avg_vsc = float(np.mean(vsc_stints)) if vsc_stints else float('nan')

vsc_to_sc_pct = (vsc_to_sc / num_vsc) * 100 if num_vsc else float('nan')
avg_vsc_len_to_sc = float(np.mean(vsc_len_to_sc)) if vsc_len_to_sc else float('nan')
avg_sc_after_vsc = float(np.mean(sc_after_vsc_lengths)) if sc_after_vsc_lengths else float('nan')

print(f"Number of SC stints: {num_sc} - {sc_stint_pct:.3f}%")
print(f"Number of VSC stints: {num_vsc} - {vsc_stint_pct:.3f}%")
print(f"Average SC stint length (laps): {avg_sc:.2f}")
print(f"Average VSC stint length (laps): {avg_vsc:.2f}")
print(f"VSC stints transitioning to SC next lap: {vsc_to_sc}")
print(f"VSC stints transitioning to SC next lap (% of VSC stints): {vsc_to_sc_pct:.3f}")
print(f"Average VSC stint length when followed by SC: {avg_vsc_len_to_sc:.2f}")
print(f"Average SC stint length when preceded by VSC: {avg_sc_after_vsc:.2f}")


Number of SC stints: 11600 - 0.580%
Number of VSC stints: 6892 - 0.345%
Average SC stint length (laps): 4.58
Average VSC stint length (laps): 4.59
VSC stints transitioning to SC next lap: 352
VSC stints transitioning to SC next lap (% of VSC stints): 5.107
Average VSC stint length when followed by SC: 2.99
Average SC stint length when preceded by VSC: 4.84
