## Introduction

This notebook is dedicated to developing a robust machine learning model capable of accurately predicting Formula 1 race ranks, providing valuable insights for F1 enthusiasts and strategic analysis.

Building upon the comprehensive data exploration and feature engineering from `f1_data_exploration.ipynb`, we consolidate relevant race and qualifying data into a unified pandas DataFrame. This dataset is then meticulously cleaned and prepared, leveraging a diverse set of features encompassing driver and constructor historical performance, qualifying results, circuit characteristics, and more, to capture the complex dynamics of F1 racing.

The core of this notebook focuses on identifying the most suitable machine learning model and its respective hyperparameters. We will systematically explore `RandomForestRegressor` and `GradientBoostingRegressor` using advanced cross-validation and hyperparameter tuning techniques from `sklearn.model_selection`. The optimization process will prioritize not just the accuracy of predicted F1 race results, but also the efficiency and generalization of the training procedure. Model performance will be rigorously evaluated using metrics such as Mean Absolute Error (MAE) and R-squared, with a particular focus on how well the model predicts the actual finishing order.

Finally, the best-performing model, along with its optimized parameters, will be serialized and saved as `f1_rank_predictor.pkl` in the `models/` directory, ready for seamless integration into the backend prediction service.

## Imports and setup

In [5]:
import pandas as pd
import numpy as np
import joblib
import fastf1
import os

## Loading Race Data

Race data for years 2015 to 2025 are loaded into `notebooks\data_cache`.

In [6]:
import time # Import the time module

# --- 1. Configure FastF1 Caching ---
cache_dir = './data_cache'
if not os.path.exists(cache_dir):
    os.makedirs(cache_dir)
    print(f"Created cache directory: {cache_dir}")
else:
    print(f"Cache directory already exists: {cache_dir}")

fastf1.Cache.enable_cache(cache_dir)
print(f"FastF1 caching enabled to '{cache_dir}'")

# Suppress FastF1's default INFO messages for cleaner output during data loading
# You can comment this line out temporarily if you want to see FastF1's internal progress.
fastf1.set_log_level('ERROR')
print("\nFastF1 log level set to ERROR to suppress verbose messages during data loading.")


# --- 2. Define the Years You Want to Read ---
start_year = 2015
end_year = 2025 # Adjusted to prevent issues with incomplete/future season data

# Lists to hold DataFrames from each race
all_race_results = []
all_qualifying_results = []

# Dictionaries to track loading status and errors
year_loading_status = {}

print(f"\n--- Starting data collection for seasons {start_year} to {end_year} ---")

for year in range(start_year, end_year + 1):
    year_loading_status[year] = {'status': 'Pending', 'races_loaded': 0, 'qual_loaded': 0, 'errors': []}
    
    try:
        # Give a small pause before fetching schedule for a new year
        time.sleep(1) 
        schedule = fastf1.get_event_schedule(year)
        
        race_events = schedule.loc[schedule['EventFormat'].isin(['conventional', 'sprint'])]
        
        if race_events.empty:
            year_loading_status[year]['status'] = 'No Race Events'
            year_loading_status[year]['errors'].append(f"No 'conventional' or 'sprint' race events found for {year}. Skipping season.")
            continue

        for round_num in race_events['RoundNumber']:
            # Pause before attempting to load each round's data
            time.sleep(1) # Pause before race session
            
            race_loaded_for_round = False
            quali_loaded_for_round = False

            try:
                # --- Load Race Session Results ---
                race_session = fastf1.get_session(year, round_num, 'R')
                race_session.load(telemetry=False, laps=False, weather=False)

                if race_session.results.empty:
                    year_loading_status[year]['errors'].append(f"WARNING: No results for Race {year} R{round_num}. Data unavailable.")
                else:
                    results_df = race_session.results.copy()
                    results_df['Season'] = year
                    results_df['Round'] = round_num
                    results_df['EventName'] = race_session.event['EventName']
                    results_df['SessionType'] = 'Race'
                    all_race_results.append(results_df)
                    year_loading_status[year]['races_loaded'] += 1
                    race_loaded_for_round = True

                # Small pause between race and qualifying session loads for the same round
                time.sleep(0.5)

                # --- Optionally, load Qualifying Session Results ---
                try:
                    quali_session = fastf1.get_session(year, round_num, 'Q')
                    quali_session.load(telemetry=False, laps=False, weather=False)

                    if quali_session.results.empty:
                        year_loading_status[year]['errors'].append(f"WARNING: No results for Qualifying {year} R{round_num}. Data unavailable.")
                    else:
                        quali_results_df = quali_session.results.copy()
                        quali_results_df['Season'] = year
                        quali_results_df['Round'] = round_num
                        quali_results_df['EventName'] = quali_session.event['EventName']
                        quali_results_df['SessionType'] = 'Qualifying'
                        all_qualifying_results.append(quali_results_df)
                        year_loading_status[year]['qual_loaded'] += 1
                        quali_loaded_for_round = True

                except Exception as e:
                    year_loading_status[year]['errors'].append(f"Error loading Qualifying for {year} R{round_num}: {e}.")

            except Exception as e:
                year_loading_status[year]['errors'].append(f"CRITICAL ERROR loading session {year} R{round_num}: {e}.")
        
        # Determine overall status for the year
        if not year_loading_status[year]['errors']:
            year_loading_status[year]['status'] = 'SUCCESS'
        elif year_loading_status[year]['races_loaded'] > 0 or year_loading_status[year]['qual_loaded'] > 0:
            year_loading_status[year]['status'] = 'PARTIAL SUCCESS (with errors)'
        else:
            year_loading_status[year]['status'] = 'FAILED'

    except Exception as e:
        year_loading_status[year]['status'] = 'SCHEDULE ERROR'
        year_loading_status[year]['errors'].append(f"Error retrieving schedule for {year}: {e}.")

# --- Print Summary of Data Loading ---
print("\n--- Data Loading Summary by Year ---")
for year, status_info in year_loading_status.items():
    print(f"Season {year}: {status_info['status']}")
    if status_info['races_loaded'] > 0:
        print(f"  Races Loaded: {status_info['races_loaded']}")
    if status_info['qual_loaded'] > 0:
        print(f"  Qualifying Sessions Loaded: {status_info['qual_loaded']}")
    if status_info['errors']:
        print(f"  Issues Encountered ({len(status_info['errors'])}):")
        for error_msg in status_info['errors']:
            print(f"    - {error_msg}")
    print("-" * 30) # Separator

# --- 3. Concatenate All DataFrames ---
print("\n--- Concatenating collected data ---")
if all_race_results:
    full_race_results_df = pd.concat(all_race_results, ignore_index=True)
    print(f"Successfully collected {len(full_race_results_df)} race results entries.")
else:
    print("No race results data collected into a DataFrame.")

if all_qualifying_results:
    full_quali_results_df = pd.concat(all_qualifying_results, ignore_index=True)
    print(f"Successfully collected {len(full_quali_results_df)} qualifying results entries.")
else:
    print("No qualifying results data collected into a DataFrame.")

print("\n--- Data collection process finished ---")

Cache directory already exists: ./data_cache
FastF1 caching enabled to './data_cache'

FastF1 log level set to ERROR to suppress verbose messages during data loading.

--- Starting data collection for seasons 2015 to 2025 ---


KeyboardInterrupt: 

## Data Preprocessing
We now handle any missing values in the dataframe, ensuring appropriate format for training.
The code cell below reveals the columns in `full_race_results_df`, from which we will remove the ones unnecessary in the training of the model.

In [7]:
full_race_results_df.columns

Index(['DriverNumber', 'BroadcastName', 'Abbreviation', 'DriverId', 'TeamName',
       'TeamColor', 'TeamId', 'FirstName', 'LastName', 'FullName',
       'HeadshotUrl', 'CountryCode', 'Position', 'ClassifiedPosition',
       'GridPosition', 'Q1', 'Q2', 'Q3', 'Time', 'Status', 'Points', 'Laps',
       'Season', 'Round', 'EventName', 'SessionType'],
      dtype='object')

The columns `'DriverNumber', 'BroadcastName', 'DriverId', 'TeamColor', 'TeamId', 'FirstName', 'LastName', 'FullName', 'HeadshotUrl', 'CountryCode', 'ClassifiedPosition', 'Q1', 'Q2', 'Q3', 'Time', 'Status', 'Points', 'Laps', 'SessionType'` are dropped from the original DataFrame, and the resulting DataFrame is stored in `new_df`.

As the model will be made to predict the raw `Position` of each driver, `ClassifiedPosition` is removed since race retirement is extremely situational and `Position` will instead be used as the target variable.

In [8]:
race_data_for_merge = full_race_results_df.drop(['DriverNumber', 'BroadcastName', 'DriverId', 'TeamColor', 'TeamId', 'FirstName', 'LastName', 'FullName', 'HeadshotUrl', 'CountryCode', 'ClassifiedPosition', 'Q1', 'Q2', 'Q3', 'Time', 'Status', 'Points', 'Laps', 'SessionType'], axis=1)

`'Q1', 'Q2', and 'Q3'` are `timedelta` objects and are converted into seconds and stored respectively in the new columns `'Q1_s', 'Q2_s', and 'Q3_s'`. This will allow for its meaningful use when training the model.

In [9]:
# Creating new columns to hold values of 'Q1', 'Q2, and 'Q3' in seconds.
full_quali_results_df['Q1_s'] = full_quali_results_df['Q1'].dt.total_seconds().fillna(9999.0)
full_quali_results_df['Q2_s'] = full_quali_results_df['Q2'].dt.total_seconds().fillna(9999.0)
full_quali_results_df['Q3_s'] = full_quali_results_df['Q3'].dt.total_seconds().fillna(9999.0)

Since certain qualifying data are useful for training the model, we merge a subset of the 2 dataframes `full_race_results_df` and `full_quali_results_df`, keeping only the relevant information.

In [10]:
quali_data_for_merge = full_quali_results_df[['Abbreviation', 'Season', 'Round', 'EventName', 'Q1_s', 'Q2_s', 'Q3_s']].copy() 

merged_df = pd.merge(
    race_data_for_merge,
    quali_data_for_merge,
    on=['Abbreviation', 'Season', 'Round', 'EventName'],
    how='left'
)

# Organising columns for readability and easier management
merged_df = merged_df[[    
    'Season',         
    'Round',
    'EventName',
    'Abbreviation',   # Driver identifier eg. 'VER', 'PER', 'ALO'
    'TeamName',      
    'GridPosition',   # Qualifying positions, also positions in which they start on during race
    'Q1_s',           # Q1 time in seconds
    'Q2_s',
    'Q3_s',
    'Position'        # The target variable for race prediction
]]

In [11]:
quali_data_for_merge = full_quali_results_df[['Abbreviation', 'Season', 'Round', 'EventName', 'Q1_s', 'Q2_s', 'Q3_s']].copy()

merged_df = pd.merge(
    race_data_for_merge,
    quali_data_for_merge,
    on=['Abbreviation', 'Season', 'Round', 'EventName'],
    how='left'
)

# Organising columns for readability and easier management
merged_df = merged_df[[
    'Season', 
    'Round',
    'EventName',
    'Abbreviation', # Driver identifier eg. 'VER', 'PER', 'ALO'
    'TeamName',
    'GridPosition', # Qualifying positions, also positions in which they start on during race
    'Q1_s', # Q1 time in seconds
    'Q2_s',
    'Q3_s',
    'Position' # The target variable for race prediction
]]

merged_df.head(20) # DataFrame snippet of race 1

Unnamed: 0,Season,Round,EventName,Abbreviation,TeamName,GridPosition,Q1_s,Q2_s,Q3_s,Position
0,2015,1,Australian Grand Prix,HAM,Mercedes,1.0,88.586,86.894,86.327,1.0
1,2015,1,Australian Grand Prix,ROS,Mercedes,2.0,88.906,87.097,86.921,2.0
2,2015,1,Australian Grand Prix,VET,Ferrari,4.0,89.307,87.742,87.757,3.0
3,2015,1,Australian Grand Prix,MAS,Williams,3.0,89.246,87.895,87.718,4.0
4,2015,1,Australian Grand Prix,NAS,Sauber,10.0,90.43,88.8,9999.0,5.0
5,2015,1,Australian Grand Prix,RIC,Red Bull,6.0,89.788,88.679,88.329,6.0
6,2015,1,Australian Grand Prix,HUL,Force India,13.0,89.651,89.208,9999.0,7.0
7,2015,1,Australian Grand Prix,ERI,Sauber,15.0,91.376,9999.0,9999.0,8.0
8,2015,1,Australian Grand Prix,SAI,Toro Rosso,7.0,89.597,88.601,88.51,9.0
9,2015,1,Australian Grand Prix,PER,Force India,14.0,89.99,89.209,9999.0,10.0


During the 2023 Singapore Grand Prix, driver Lance Stroll was withdrawn from the Sunday race following a heavy crash during Saturday's qualifying session. 

As a result, we see an empty value for Stroll's `'GridPosition'` and `'Position'`for the event.

As there are instances (rows 197, 241, 278, 633) where drivers starting from the pitlanes has resulted in `'GridPosition'` and `'Position'` holding the value of 0.0, we deduce that this is an anomaly on the side of the API.

In [None]:
# Result should be 1, referring to this singular anomaly
print("Number of rows with missing values for 'GridPosition' before cleaning:", merged_df['GridPosition'].isna().sum())

# A simple fix would be to replace all missing values with 0.0
merged_df.loc[merged_df['GridPosition'].isna(), 'GridPosition'] = 0.0
merged_df.loc[merged_df['Position'].isna(), 'Position'] = 0.0
merged_df.loc[merged_df['Q1_s'].isna(), 'Q1_s'] = 0.0
merged_df.loc[merged_df['Q2_s'].isna(), 'Q2_s'] = 0.0
merged_df.loc[merged_df['Q3_s'].isna(), 'Q3_s'] = 0.0

# Result after cleaning
print("Number of rows with missing values for 'GridPosition' after cleaning:", merged_df['GridPosition'].isna().sum())

#merged_df.to_csv(os.path.join(os.getcwd(), 'merged_df.csv'), index=False) # This optional line creates a .csv file for better visualisation

#### One-hot Encoding

Now that we have the prepared `merged_df`, we have to convert categorical (non-numerical) data into a numerical format that machine learning algorithms can understand and process effectively. 

Let us look at the `'TeamName'` column. Without one-hot encoding, if we were to simply assign each team values eg. Red Bull = 1, Ferrari = 2, Mercedes = 3, a machine learning model would interpret these numbers as having an inherent order or magnitude. It might think that Mercedes (3) is "better" or "more important" or "further away" from Red Bull (1) than Ferrari (2) is. This is clearly false for nominal categories like team names, where there's no inherent numerical relationship. This false ordinality can mislead the model and lead to incorrect predictions or reduced performance.

With one-hot encoding, we create new binary (0 or 1) columns for every team that exists. If in row 1 was Max Verstappen from team Red bull Racing, we would now have a value of 1 in Team_RedBull and a 0 in the columns of every other team.

In [None]:
from sklearn.preprocessing import OneHotEncoder

# Define categorical columns for one-hot encoding
categorical_cols_to_encode = ['Abbreviation', 'TeamName', 'EventName']

print(f"\nOriginal DataFrame shape: {merged_df.shape}")

# --- Initialize and Fit One-Hot Encoder ---
ohe = OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore')

# Fit the encoder on the categorical columns of your training data
ohe.fit(merged_df[categorical_cols_to_encode])

# --- Transform the DataFrame ---
# Apply the fitted encoder to transform the categorical columns
encoded_features = ohe.transform(merged_df[categorical_cols_to_encode])

# Create a DataFrame from the encoded features with proper column names
encoded_df = pd.DataFrame(
    encoded_features.astype(int), 
    columns=ohe.get_feature_names_out(categorical_cols_to_encode),
    index=merged_df.index
)

# Drop the original categorical columns from merged_df
numerical_and_other_features_df = merged_df.drop(columns=categorical_cols_to_encode)

# Concatenate the original numerical/other features with the new encoded features
merged_df_encoded = pd.concat([numerical_and_other_features_df, encoded_df], axis=1)

# Note the increase in number of columns after encoding
print(f"Encoded DataFrame shape: {merged_df_encoded.shape}")


# --- IMPORTANT: Save the fitted OneHotEncoder ---
# This is crucial for preprocessing.py to work correctly for new data.
models_dir = os.path.join('..', 'models')
os.makedirs(models_dir, exist_ok=True) # Ensure the 'models' directory exists

joblib.dump(ohe, os.path.join(models_dir, 'one_hot_encoder.joblib'))
print(f"Fitted OneHotEncoder saved successfully to: {os.path.join(models_dir, 'one_hot_encoder.joblib')}")

# You will also need to save your scaler and the final training feature names, as discussed previously.
joblib.dump(merged_df_encoded.columns.tolist(), os.path.join(models_dir, 'training_feature_names.joblib'))

#merged_df_encoded.to_csv(os.path.join(os.getcwd(), 'merged_df_encoded.csv'), index=False) # This optional line creates a .csv file for better visualisation

In [None]:
merged_df_encoded.columns

## Model Selection

Splitting data into features (X) and the target variable (y).

In [None]:
# Define the target variable
y = merged_df_encoded['Position']

# Define the features (X) by dropping the target and any other non-feature columns
columns_to_exclude_from_X = ['Position'] # Only exclude the target variable

X = merged_df_encoded.drop(columns=columns_to_exclude_from_X, axis=1)

print(f"\nX shape: {X.shape}")
print(f"y shape: {y.shape}")

Dividing the data into training and testing sets.

In [None]:
from sklearn.model_selection import train_test_split

# Split the data with 80% for training and 20% for testing
# random_state ensures reproducibility of your split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(f"\nTrain set shape (X_train, y_train): {X_train.shape}, {y_train.shape}")
print(f"Test set shape (X_test, y_test): {X_test.shape}, {y_test.shape}")

Choose __Candidate Models__ and define __Hyperparameter Grids__

In [None]:
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.metrics import mean_absolute_error, make_scorer

# Define the scoring metric for optimization
# For MAE, lower is better, so we use 'neg_mean_absolute_error' for GridSearchCV
# which maximizes the score.
mae_scorer = make_scorer(mean_absolute_error, greater_is_better=False)

# --- Model 1: RandomForestRegressor ---
rf_model = RandomForestRegressor(random_state=42)
rf_param_grid = {
    'n_estimators': [100, 200, 300],
    'max_features': [0.6, 0.8, 1.0],
    'min_samples_leaf': [1, 2, 4],
}

# --- Model 2: GradientBoostingRegressor ---
gb_model = GradientBoostingRegressor(random_state=42)
gb_param_grid = {
    'n_estimators': [100, 200, 300, 700],
    'learning_rate': [0.01, 0.05, 0.1],
    'max_depth': [3, 4, 5],
}

# You could add more models here, e.g.,
# from xgboost import XGBRegressor
# xgb_model = XGBRegressor(random_state=42)
# xgb_param_grid = { ... }

Perform __Hyperparameter Tuning__ with Cross-Validation (GridSearchCV)

In [None]:
# --- GridSearchCV for RandomForestRegressor ---
print("\n--- Starting GridSearchCV for RandomForestRegressor ---")
rf_grid_search = GridSearchCV(
    estimator=rf_model,
    param_grid=rf_param_grid,
    scoring=mae_scorer,
    cv=5,            # 5-fold cross-validation
    n_jobs=-1,       # Use all available CPU cores
    verbose=1        # Show progress
)
rf_grid_search.fit(X_train, y_train)

print("\nBest parameters for RandomForestRegressor:", rf_grid_search.best_params_)
print("Best cross-validated MAE for RandomForestRegressor:", -rf_grid_search.best_score_)


# --- GridSearchCV for GradientBoostingRegressor ---
print("\n--- Starting GridSearchCV for GradientBoostingRegressor ---")
gb_grid_search = GridSearchCV(
    estimator=gb_model,
    param_grid=gb_param_grid,
    scoring=mae_scorer,
    cv=5,
    n_jobs=-1,
    verbose=1
)
gb_grid_search.fit(X_train, y_train)

print("\nBest parameters for GradientBoostingRegressor:", gb_grid_search.best_params_)
print("Best cross-validated MAE for GradientBoostingRegressor:", -gb_grid_search.best_score_)

 Evaluate the Best Models on the Test Set


In [None]:
from sklearn.metrics import mean_absolute_error, r2_score, mean_squared_error

print("--- Step 5: Evaluate the Best Models on the Test Set ---")

# Retrieve the best models from GridSearchCV
best_rf_model = rf_grid_search.best_estimator_
best_gb_model = gb_grid_search.best_estimator_

# Make predictions on the test set
rf_predictions = best_rf_model.predict(X_test)
gb_predictions = best_gb_model.predict(X_test)

print("\n--- RandomForestRegressor Evaluation ---")
rf_mae = mean_absolute_error(y_test, rf_predictions)
rf_r2 = r2_score(y_test, rf_predictions)
rf_rmse = np.sqrt(mean_squared_error(y_test, rf_predictions))

print(f"RandomForestRegressor MAE on Test Set: {rf_mae:.4f}")
print(f"RandomForestRegressor R2 on Test Set: {rf_r2:.4f}")
print(f"RandomForestRegressor RMSE on Test Set: {rf_rmse:.4f}")

print("\n--- GradientBoostingRegressor Evaluation ---")
gb_mae = mean_absolute_error(y_test, gb_predictions)
gb_r2 = r2_score(y_test, gb_predictions)
gb_rmse = np.sqrt(mean_squared_error(y_test, gb_predictions))

print(f"GradientBoostingRegressor MAE on Test Set: {gb_mae:.4f}")
print(f"GradientBoostingRegressor R2 on Test Set: {gb_r2:.4f}")
print(f"GradientBoostingRegressor RMSE on Test Set: {gb_rmse:.4f}")

# You can also compare them:
print("\n--- Model Comparison ---")
if rf_mae < gb_mae:
    print("RandomForestRegressor performed better in terms of MAE.")
elif gb_mae < rf_mae:
    print("GradientBoostingRegressor performed better in terms of MAE.")
else:
    print("Both models have similar MAE.")

# Optional: Store evaluation metrics for later analysis if needed
evaluation_results = {
    'RandomForestRegressor': {'MAE': rf_mae, 'R2': rf_r2, 'RMSE': rf_rmse},
    'GradientBoostingRegressor': {'MAE': gb_mae, 'R2': gb_r2, 'RMSE': gb_rmse}
}

Select the Final Model and Model Persistence

In [None]:
from sklearn.ensemble import GradientBoostingRegressor

print("--- Model Selection ---")

# Based on evaluation results, GradientBoostingRegressor is the chosen model.
# Assuming gb_grid_search is defined and available from previous cells in the notebook
best_model_to_save = gb_grid_search.best_estimator_ # Access the best estimator from GridSearchCV
model_name = "GradientBoosting"

print(f"\nSelected final model: {model_name}")
print("Best parameters for selected model:")
print(best_model_to_save.get_params())


# Model Persistence (Saving the Model)
# Define the relative path to the 'models' directory
# This assumes the notebook is run from within the 'notebooks' directory
# and 'models' is a sibling directory to 'notebooks'
model_dir = os.path.join('..', 'models')

# Create the directory if it doesn't exist
os.makedirs(model_dir, exist_ok=True)

# Construct the full path for the model file
model_filename = os.path.join(model_dir, f'{model_name}_F1_Race_Predictor_model.joblib')

joblib.dump(best_model_to_save, model_filename, compress=3)

print(f"\nModel saved successfully to: {model_filename}")

# Loading the Model to Verify
print("\n--- Verifying Model Loading (Optional) ---")
loaded_model = joblib.load(model_filename)
print(f"Model loaded successfully: {loaded_model}")