# End-to-End F1 Race Position Prediction Pipeline

## Introduction

This notebook presents a comprehensive workflow for predicting Formula 1 race outcomes using Deep Learning. By analyzing driver performance data from Practice and Qualifying sessions, we aim to forecast the final finishing position of each driver before the race begins.

The project demonstrates a complete machine learning lifecycle, from raw data ingestion to model deployment.

### Project Architecture

The pipeline is structured into five distinct stages:

1.  **Dataset Creation:** We start by aggregating raw session files (lap times, weather conditions, tire stints) from the OpenF1 API into a unified master dataset.
2.  **Feature Engineering:** Raw data is transformed into 22 meaningful performance metrics, such as practice pace, tire degradation proxies, and qualifying improvements.
3.  **Exploratory Data Analysis (EDA):** We visualize correlations and distributions to understand the factors that drive race results.
4.  **Model Development:** We design and train a Deep Feedforward Neural Network (DNN) using PyTorch, optimized for regression tasks.
5.  **Evaluation & Testing:** The model's performance is rigorously tested on unseen data to quantify its predictive accuracy in real-world scenarios.

## 1. Environment Setup

We begin by initializing our development environment. We rely on a robust stack of data science libraries:

*   **PyTorch:** The core deep learning framework used for building and training our neural network.
*   **Pandas & NumPy:** Essential for high-performance data manipulation and numerical operations.
*   **Seaborn & Matplotlib:** Used to create insightful visualizations that help us interpret our data and model results.
*   **Scikit-Learn:** Provides utility functions for data splitting and feature scaling.

In [None]:
import pandas as pd
import numpy as np
import os
import ast
import time
from glob import glob
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Configure plotting aesthetics for clear, professional visuals
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 12

print("Libraries imported successfully. Environment is ready.")

## 2. Master Dataset Construction

The foundation of any machine learning model is high-quality data. In this step, we consolidate fragmented session files into a single, coherent dataset.

The `build_master_dataset` function is designed to iterate through our repository of raw session files. For each race weekend, it links Practice, Qualifying, and Race sessions, ensuring that every driver's weekend journey is captured in a single record.

> **Note:** This block assumes the presence of raw data in the `all_session_data/` directory. If the `master_dataset.csv` already exists, this step can be skipped.

In [None]:
# Configuration Paths
DATA_DIR = "all_session_data"
SESSIONS_DIR = "sessions"
OUTPUT_FILE = "master_dataset.csv"

def build_master_dataset():
    if not os.path.exists(SESSIONS_DIR):
        print(f"Directory {SESSIONS_DIR} not found. Skipping dataset build.")
        return
        
    print("Initiating Master Dataset Construction...")
    # This section would contain the logic to parse CSVs, match session keys,
    # and aggregate lap times. For this notebook, we assume the builder script
    # has already been run or the logic is imported.
    print("Dataset builder logic placeholder.")
    print("Please run 'python master_dataset_builder.py' to rebuild from raw files.")

# build_master_dataset()

## 3. Feature Engineering

Raw lap times alone are insufficient for predictive modeling. We need to extract signals that indicate a driver's true potential.

In this phase, we transform the raw data into 22 distinct features. Key engineered features include:

*   **Practice Pace (`practice_best_lap`):** The single fastest lap recorded during practice, serving as a baseline for raw speed.
*   **Qualifying Improvement (`practice_vs_quali_improvement`):** Measures a driver's ability to find extra performance under pressure. A large improvement suggests the car has hidden potential.
*   **Tire Usage (`practice_total_tire_laps`):** Indicates how much data the team gathered on tire degradation, which correlates with race strategy execution.
*   **Grid Position (`grid_position`):** Historically the most significant predictor, but our model aims to look beyond just where they start.

In [None]:
def run_feature_engineering():
    if not os.path.exists('master_dataset.csv'):
        print("Error: master_dataset.csv not found. Please ensure it exists.")
        return

    print("Starting Feature Engineering process...")
    df = pd.read_csv('master_dataset.csv')
    
    # Separate data by session type for targeted aggregation
    practice = df[df['session_type'] == 'Practice'].copy()
    qualifying = df[df['session_type'] == 'Qualifying'].copy()
    race = df[df['session_type'] == 'Race'].copy()

    # 1. Aggregate Practice Data
    # We combine data from FP1, FP2, and FP3 to get a holistic view of practice performance
    practice_agg = practice.groupby(['meeting_key', 'driver_name']).agg({
        'best_lap_time': ['min', 'mean'],
        'total_laps': 'sum',
        'best_sector_1': 'min', 'best_sector_2': 'min', 'best_sector_3': 'min',
        'avg_i1_speed': 'mean', 'avg_i2_speed': 'mean', 'avg_st_speed': 'mean',
        'purple_sectors_count': 'sum', 'green_sectors_count': 'sum',
        'total_soft_laps': 'sum', 'total_medium_laps': 'sum', 'total_hard_laps': 'sum',
    }).reset_index()
    
    # Flatten hierarchical column names for easier access
    practice_agg.columns = ['_'.join(col).strip('_') if col[1] else col[0] for col in practice_agg.columns.values]
    practice_agg = practice_agg.rename(columns={
        'best_lap_time_min': 'practice_best_lap',
        'best_lap_time_mean': 'practice_avg_best_lap',
        'total_laps_sum': 'practice_total_laps',
        'best_sector_1_min': 'practice_best_s1',
        'best_sector_2_min': 'practice_best_s2',
        'best_sector_3_min': 'practice_best_s3',
        'avg_i1_speed_mean': 'practice_avg_i1_speed',
        'avg_i2_speed_mean': 'practice_avg_i2_speed',
        'avg_st_speed_mean': 'practice_avg_st_speed',
        'purple_sectors_count_sum': 'practice_purple_sectors',
        'green_sectors_count_sum': 'practice_green_sectors',
        'total_soft_laps_sum': 'practice_soft_laps',
        'total_medium_laps_sum': 'practice_medium_laps',
        'total_hard_laps_sum': 'practice_hard_laps',
    })

    # 2. Prepare Qualifying Data
    # We extract the absolute best lap time achieved in qualifying
    quali_agg = qualifying.groupby(['meeting_key', 'driver_name']).agg({
        'best_lap_time': 'min',
        'best_sector_1': 'min', 'best_sector_2': 'min', 'best_sector_3': 'min',
    }).reset_index().rename(columns={
        'best_lap_time': 'quali_best_lap',
        'best_sector_1': 'quali_best_s1',
        'best_sector_2': 'quali_best_s2',
        'best_sector_3': 'quali_best_s3',
    })

    # 3. Prepare Race Results (Target Variable)
    # This contains the ground truth we want to predict
    race_agg = race.groupby(['meeting_key', 'driver_name']).agg({
        'starting_position': 'first',
        'finishing_position': 'first',
        'points': 'first',
    }).reset_index().rename(columns={'starting_position': 'grid_position'})

    # 4. Merge Datasets
    # We use inner joins to ensure we only keep records where we have data for all three session types
    dataset = practice_agg.merge(quali_agg, on=['meeting_key', 'driver_name'], how='inner')
    dataset = dataset.merge(race_agg, on=['meeting_key', 'driver_name'], how='inner')

    # 5. Data Cleaning & Derived Features
    dataset = dataset.dropna(subset=['practice_best_lap', 'grid_position', 'finishing_position'])
    dataset['practice_total_sectors'] = dataset['practice_best_s1'] + dataset['practice_best_s2'] + dataset['practice_best_s3']
    dataset['practice_vs_quali_improvement'] = dataset['practice_best_lap'] - dataset['quali_best_lap']
    dataset['practice_total_tire_laps'] = dataset['practice_soft_laps'] + dataset['practice_medium_laps'] + dataset['practice_hard_laps']

    dataset.to_csv('training_dataset.csv', index=False)
    print(f"Feature Engineering Complete. Training dataset saved with {len(dataset)} examples.")
    return dataset

df = run_feature_engineering()

## 4. Exploratory Data Analysis (EDA)

Before diving into model training, it is crucial to understand the statistical properties of our data. We will use visualization to uncover patterns and potential biases.

We will examine:
1.  **Distributions:** Are the finishing positions evenly distributed, or are there anomalies?
2.  **Correlations:** Which features have the strongest relationship with the finishing position? This helps us verify our feature engineering assumptions.

In [None]:
if os.path.exists('training_dataset.csv'):
    df = pd.read_csv('training_dataset.csv')
    
    # 1. Position Distributions
    plt.figure(figsize=(14, 5))
    
    plt.subplot(1, 2, 1)
    sns.histplot(df['grid_position'], bins=20, kde=False, color='#3498db', edgecolor='black')
    plt.title('Distribution of Starting Grid Positions')
    plt.xlabel('Grid Position')
    plt.ylabel('Frequency')
    
    plt.subplot(1, 2, 2)
    sns.histplot(df['finishing_position'], bins=20, kde=False, color='#e74c3c', edgecolor='black')
    plt.title('Distribution of Finishing Positions')
    plt.xlabel('Finishing Position')
    plt.ylabel('Frequency')
    
    plt.tight_layout()
    plt.show()
    
    # 2. Correlation Heatmap
    # We select a subset of key features to keep the heatmap readable
    plt.figure(figsize=(12, 10))
    cols_to_plot = ['finishing_position', 'grid_position', 'quali_best_lap', 
                    'practice_best_lap', 'practice_avg_st_speed', 'practice_total_laps']
    
    corr_matrix = df[cols_to_plot].corr()
    
    sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt='.2f', linewidths=0.5, square=True)
    plt.title('Feature Correlation Matrix')
    plt.show()

## 5. Neural Network Architecture Design

We have chosen a **Multi-Layer Perceptron (MLP)** for this regression task. The architecture is designed to capture non-linear relationships between the input features and the target variable.

**Key Architectural Decisions:**

*   **Tapering Layer Sizes (128 -> 64 -> 32):** This structure forces the network to learn increasingly abstract and compressed representations of the input data.
*   **Batch Normalization:** Applied after each linear layer to stabilize learning and allow for higher learning rates.
*   **Dropout:** We use dropout layers (30% and 20%) to prevent overfitting. This forces the network to learn robust features rather than memorizing specific training examples.
*   **ReLU Activation:** Used to introduce non-linearity, allowing the model to learn complex patterns.
*   **Single Output Neuron:** Since this is a regression problem (predicting a position from 1 to 20), we use a single output neuron with no activation function, allowing it to predict any continuous value.

In [None]:
class F1DeepPredictor(nn.Module):
    def __init__(self, input_features=22):
        super(F1DeepPredictor, self).__init__()
        
        # Layer 1: Input -> 128 Neurons
        # High dimensionality in the first layer helps capture complex interactions
        self.fc1 = nn.Linear(input_features, 128)
        self.bn1 = nn.BatchNorm1d(128)
        self.dropout1 = nn.Dropout(0.3)
        
        # Layer 2: 128 -> 64 Neurons
        self.fc2 = nn.Linear(128, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.dropout2 = nn.Dropout(0.3)
        
        # Layer 3: 64 -> 32 Neurons
        self.fc3 = nn.Linear(64, 32)
        self.bn3 = nn.BatchNorm1d(32)
        self.dropout3 = nn.Dropout(0.2)
        
        # Output Layer: 32 -> 1 Neuron
        # Outputs a continuous value representing the predicted position
        self.fc4 = nn.Linear(32, 1)
    
    def forward(self, x):
        # Forward pass with ReLU activation and Dropout
        x = F.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)
        
        x = F.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)
        
        x = F.relu(self.bn3(self.fc3(x)))
        x = self.dropout3(x)
        
        x = self.fc4(x)
        return x

## 6. Model Training Loop

With our architecture defined, we proceed to train the model. The training process involves iterating through the dataset for **100 epochs**.

**Training Configuration:**
*   **Optimizer:** Adam (Adaptive Moment Estimation) is used for its efficiency and ability to handle sparse gradients.
*   **Loss Function:** Mean Squared Error (MSE) is chosen as it penalizes larger prediction errors more heavily, encouraging the model to be precise.
*   **Data Splitting:** We strictly separate our data into Training, Validation, and Test sets to ensure unbiased evaluation.

We also implement real-time monitoring of Training Loss and Validation MAE to detect overfitting early.

In [None]:
def train_f1_model():
    if not os.path.exists('training_dataset.csv'):
        return

    print("Preparing data for training...")
    df = pd.read_csv('training_dataset.csv')
    feature_cols = [col for col in df.columns if col not in ['meeting_key', 'driver_name', 'finishing_position', 'points']]
    
    X = df[feature_cols].values
    y = df['finishing_position'].values.astype(float)

    # Preprocessing: Handle Missing Values & Normalize
    nan_mask = np.isnan(X)
    if nan_mask.any():
        col_means = np.nanmean(X, axis=0)
        for i in range(X.shape[1]):
            X[nan_mask[:, i], i] = col_means[i]

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    # Clip outliers to prevent gradient instability
    X_scaled = np.clip(X_scaled, -5, 5)

    # Split Data: Train (70%), Val (10%), Test (20%)
    X_temp, X_test, y_temp, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.125, random_state=42)

    # Create PyTorch DataLoaders
    train_loader = DataLoader(TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)), batch_size=16, shuffle=True)
    val_loader = DataLoader(TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)), batch_size=16, shuffle=False)
    test_loader = DataLoader(TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)), batch_size=16, shuffle=False)

    # Initialize Model components
    model = F1DeepPredictor(input_features=len(feature_cols))
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
    criterion = nn.MSELoss()

    # Training Loop
    history = {'train_loss': [], 'val_loss': [], 'val_mae': []}
    num_epochs = 100
    
    print(f"Starting training for {num_epochs} epochs...")
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for batch_X, batch_y in train_loader:
            outputs = model(batch_X).squeeze()
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            # Gradient clipping prevents exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()
        
        # Validation Phase
        model.eval()
        val_loss = 0.0
        val_mae = 0.0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                outputs = model(batch_X).squeeze()
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
                val_mae += torch.abs(outputs - batch_y).sum().item()
        
        # Store metrics for visualization
        history['train_loss'].append(train_loss / len(train_loader))
        history['val_loss'].append(val_loss / len(val_loader))
        history['val_mae'].append(val_mae / len(val_loader.dataset))

    # Visualization: Training Curves
    plt.figure(figsize=(14, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss', linewidth=2)
    plt.plot(history['val_loss'], label='Validation Loss', linewidth=2)
    plt.title('Learning Curve: Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('MSE Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history['val_mae'], color='green', label='Validation MAE', linewidth=2)
    plt.title('Validation Mean Absolute Error')
    plt.xlabel('Epoch')
    plt.ylabel('MAE (Positions)')
    plt.legend()
    plt.show()
    
    # Save the trained model artifact
    torch.save({
        'model_state': model.state_dict(),
        'scaler': scaler,
        'features': feature_cols,
    }, 'f1_model.pth')
    
    return model, scaler, feature_cols, X_test, y_test

model, scaler, feature_cols, X_test, y_test = train_f1_model()

## 7. Evaluation & Results Analysis

The final step is to rigorously evaluate our model on the Test Set—data that the model has never seen during training.

We use two primary visualizations to assess performance:
1.  **Predicted vs. Actual Scatter Plot:** Ideally, all points should lie on the red diagonal line. Deviations from this line represent prediction errors.
2.  **Error Distribution Histogram:** This shows us the spread of our errors. A narrow, tall peak centered at zero indicates a highly accurate model.

We also calculate key metrics like **Mean Absolute Error (MAE)** and **Accuracy within ±N positions** to give us concrete performance numbers.

In [None]:
# Generate predictions on the test set
model.eval()
with torch.no_grad():
    X_test_tensor = torch.FloatTensor(X_test)
    predictions_raw = model(X_test_tensor).squeeze().numpy()
    # Clip predictions to valid range [1, 20] and round to nearest integer
    predictions = np.clip(np.round(predictions_raw), 1, 20)

# 1. Visualization: Predicted vs Actual
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, alpha=0.6, color='#8e44ad', edgecolors='w', s=80)
plt.plot([1, 20], [1, 20], 'r--', linewidth=2, label='Perfect Prediction')
plt.xlabel('Actual Finishing Position')
plt.ylabel('Predicted Finishing Position')
plt.title('Model Accuracy: Predicted vs Actual Results')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# 2. Visualization: Error Distribution
errors = predictions - y_test
plt.figure(figsize=(10, 6))
sns.histplot(errors, bins=15, kde=True, color='#16a085')
plt.title('Distribution of Prediction Errors')
plt.xlabel('Error (Predicted - Actual)')
plt.ylabel('Frequency')
plt.axvline(0, color='red', linestyle='--', label='Zero Error')
plt.legend()
plt.show()

# Calculate and Print Final Metrics
mae = np.abs(errors).mean()
within_1 = (np.abs(errors) <= 1).sum() / len(errors) * 100
within_3 = (np.abs(errors) <= 3).sum() / len(errors) * 100

print(f"Final Model Evaluation Results:")
print(f"   Mean Absolute Error: {mae:.2f} positions")
print(f"   Accuracy (within ±1 position): {within_1:.1f}%")
print(f"   Accuracy (within ±3 positions): {within_3:.1f}%")