# Predicting in top 10

## Imports and Configuration

In [1]:
# Suppress warnings
import warnings
warnings.filterwarnings("ignore")

# Standard library imports
import sys
import os
import logging
import time

# Third-party imports
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from joblib import dump, load

from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler

# from tensorflow.keras.layers import Dense
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.wrappers.scikit_learn import KerasClassifier

# User-defined imports
module_path = os.path.abspath(os.path.join("..", "scripts"))
if module_path not in sys.path:
    sys.path.append(module_path)

from utilities import remove_unnamed_col, convert_to_seconds

In [None]:
# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

Below are the columns required for the supervised learning model. Columns marked with an asterisk (*) are indices and won't be included in the model input. Columns marked with a hash (#) are labels.

- **Index Columns:**
  - *RaceId {year-session} `[race_id]`
  - *RacerId {firstname-lastname} `[racer_id]`
  - *TrackId {circuitname} `[track_id]`

- **Feature Columns:**
  - Type of track `[is_street]`
    - 0 for purpose-built track
    - 1 for street track
  - Previous year result for racer `[prev_year_pos]` (0 if not available or didn't participate) 
  - Qualifying position `[qualifying_pos]` 
  - Qualifying timing:
    - Q1 `[q1_timing]` 
    - Q2 `[q2_timing]` (if racer didn't qualify for Q2, this will be 0) 
    - Q3 `[q3_timing]` (if racer didn't qualify for Q3, this will be 0) 

- **Label Column:**
  - #Race finish `[race_finish]` 

In [None]:
required_columns = ["driverId", "q1", "q2", "q3", "race_id"]

In [None]:
def load_and_process_csv(filepath, id_columns, rename_columns=None, fill_na_value=None):
    """
    Generic function to load and process CSV files.
    Args:
        filepath (str): The file path to the CSV file.
        id_columns (list): Columns to be used to create the unique ID.
        rename_columns (dict): Optional dictionary for renaming columns.
        fill_na_value (any): Optional value to fill NaN values.
    Returns:
        pd.DataFrame: Processed DataFrame.
    """
    df = pd.read_csv(filepath)
    df["race_id"] = df[id_columns[0]].astype(str) + "_" + df[id_columns[1]].astype(str)
    if rename_columns:
        df = df.rename(columns=rename_columns)
    if fill_na_value is not None:
        df = df.fillna(fill_na_value)
    df = remove_unnamed_col(df)
    return df

def join_dataframes(df1, df2, join_key):
    """Join two DataFrames on a specified key."""
    return df1.merge(df2, how="left", on=join_key)


# Load and process data
race_information_filepath = "../data/raw/Race_Information_1995_2023.csv"
race_results_filepath = "../data/raw/Race_Results_1995_2023.csv"
qualifying_results_filepath = "../data/raw/Qualification_Results_1995_2023.csv"

race_information = load_and_process_csv(race_information_filepath, ["season", "round"])
race_results = load_and_process_csv(
    race_results_filepath,
    ["season", "round"],
    rename_columns={"grid": "qualifying_pos"},
)
race_results_information = join_dataframes(race_results, race_information, "race_id")
qualifying_results = load_and_process_csv(
    qualifying_results_filepath,
    ["season", "round"],
    rename_columns={"q1": "q1_timing", "q2": "q2_timing", "q3": "q3_timing"},
    fill_na_value=0,
)


display(race_results_information.info())
display(qualifying_results.info())

In [None]:
def add_previous_year_results(df):
    """
    Adds previous year's average position for each driver to the DataFrame.

    Parameters:
    - df: pandas DataFrame containing race results information.

    Returns:
    - Modified DataFrame with an additional column for the previous year's average position.
    """
    # Create a copy of the DataFrame with the season incremented to match the next year's season
    prev_year_info = df.copy()
    prev_year_info["season"] += 1

    # Group by driverId, circuitId, and season, then calculate the mean position
    prev_year_avg_positions = (
        prev_year_info.groupby(["driverId", "circuitId", "season"])["position"]
        .mean()
        .reset_index()
    )

    # Rename columns to match for merging
    prev_year_avg_positions.rename(columns={"position": "prev_year_pos"}, inplace=True)

    # Merge the modified DataFrame back to the original DataFrame
    df = df.merge(
        prev_year_avg_positions, on=["driverId", "circuitId", "season"], how="left"
    )

    # Fill NaN values with 0 for drivers without a previous year position
    df["prev_year_pos"].fillna(0, inplace=True)

    return df

In [None]:
# Assuming you want to keep all matches and all columns from both DataFrames are relevant
merged_df = pd.merge(
    qualifying_results_required_columns,
    race_results_information[
        [
            "driverId",
            "race_id",
            "position",
            "qualifying_pos",
            "prev_year_pos",
            "season",
            "round",
        ]
    ],
    on=["driverId", "race_id"],
    how="inner",
    validate="one_to_one",
)

# Directly saving the required DataFrame to CSV, no need to create an intermediate DataFrame
merged_df.to_csv("../data/prepared/Complete_data_supervised_learning.csv", index=False)

In [None]:
# You can also read the data directly (if available)
final_data = pd.read_csv("Complete_data_supervised_learning.csv")

In [None]:


# Convert timing columns to seconds
final_data['q1_timing'] = final_data['q1_timing'].apply(convert_to_seconds)
final_data['q2_timing'] = final_data['q2_timing'].apply(convert_to_seconds)
final_data['q3_timing'] = final_data['q3_timing'].apply(convert_to_seconds)
final_data['position'] = final_data['position'].apply(lambda x: 1 if x <= 10 else 0)
final_data_required = final_data[["q1_timing", "q2_timing", "q3_timing", "qualifying_pos", "prev_year_pos", "position"]]
final_data_scaled = final_data_required.copy()

In [None]:
scaler = MinMaxScaler()
final_data_scaled[['q1_timing', 'q2_timing', 'q3_timing', 'qualifying_pos', 'prev_year_pos']] = scaler.fit_transform(final_data_scaled[['q1_timing', 'q2_timing', 'q3_timing', 'qualifying_pos', 'prev_year_pos']])

# Calculate correlation matrix
corr_matrix = final_data_scaled.corr()

# Plotting the correlation matrix as a heatmap
plt.figure(figsize=(20, 10))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm')
plt.show()

In [None]:
# Splitting the dataset into features and the label
label_col = "position"
X = final_data_required.drop(label_col, axis=1)
y = final_data_required[label_col]

# Splitting the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Normalizing the feature data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

## Modeling

### Linear Methods

In [None]:
def build_best_logistic_regression_model(
    X_train_scaled, X_test_scaled, y_train, y_test, param_grid, **grid_search_kwargs
):
    """
    Builds and returns the best Logistic Regression model using GridSearchCV.

    Args:
        X_train_scaled (pd.DataFrame): Scaled training features.
        X_test_scaled (pd.DataFrame): Scaled test features.
        y_train (pd.Series): Training labels.
        y_test (pd.Series): Test labels.
        param_grid (dict): Parameter grid for GridSearchCV.
        **grid_search_kwargs: Additional keyword arguments for GridSearchCV.

    Returns:
        best_model (LogisticRegression): The best Logistic Regression model.
        accuracy (float): Accuracy of the best model on the test set.
    """
    # Initializing the Logistic Regression model
    log_reg = LogisticRegression(random_state=42, max_iter=1000)

    # Setting up GridSearchCV to find the best model
    grid_search = GridSearchCV(
        estimator=log_reg,
        param_grid=param_grid,
        cv=5,
        scoring="accuracy",
        n_jobs=-1,
        verbose=1,
        **grid_search_kwargs,
    )

    start_time = time.time()
    logging.info("Starting GridSearchCV to find the best Logistic Regression model.")

    try:
        # Fitting GridSearchCV to the training data
        grid_search.fit(X_train_scaled, y_train)

        # Extracting the best estimator (model)
        best_model = grid_search.best_estimator_

        # Making predictions with the best model on the test set
        y_pred = best_model.predict(X_test_scaled)

        # Calculating the accuracy of the best model
        accuracy = accuracy_score(y_test, y_pred)

        end_time = time.time()
        elapsed_time = end_time - start_time

        logging.info(f"GridSearchCV completed in {elapsed_time:.2f} seconds.")
        logging.info(f"Best Model's Accuracy: {accuracy * 100:.2f}%")
        logging.info(f"Best Parameters: {grid_search.best_params_}")

        # Returning the best model and its accuracy
        return best_model, accuracy

    except Exception as e:
        logging.error(f"An error occurred during GridSearchCV: {e}")
        return None, None


# Defining the parameter grid for logistic regression
lr_param_grid = {
    "C": [0.01, 0.1, 1, 10, 100],
    "penalty": ["l1", "l2"],
    "solver": ["liblinear"],  # 'liblinear' is compatible with l1 and l2 penalties.
}

# Building and evaluating the Logistic Regression model
best_lr_model, best_lr_accuracy = build_best_logistic_regression_model(
    X_train_scaled,
    X_test_scaled,
    y_train,
    y_test,
    lr_param_grid,
    return_train_score=True,
)

if best_lr_model is not None:
    dump(best_lr_model, "best_logistic_regression_model.joblib")
    # You can also read the model directly if available
    # best_lr_model = load("best_logistic_regression_model.joblib")

### Ensemble Methods

In [2]:
def build_best_random_forest_model(
    X_train, X_test, y_train, y_test, param_grid, **grid_search_kwargs
):
    """
    Builds and returns the best Random Forest model using GridSearchCV.

    Args:
        X_train (pd.DataFrame): Training features.
        X_test (pd.DataFrame): Test features.
        y_train (pd.Series): Training labels.
        y_test (pd.Series): Test labels.
        param_grid (dict): Parameter grid for GridSearchCV.
        **grid_search_kwargs: Additional keyword arguments for GridSearchCV.

    Returns:
        best_model (RandomForestClassifier): The best Random Forest model.
        accuracy (float): Accuracy of the best model on the test set.
    """
    # Initializing the Random Forest classifier
    rf_clf = RandomForestClassifier(random_state=42)

    # Setting up GridSearchCV to find the best model
    grid_search = GridSearchCV(
        estimator=rf_clf,
        param_grid=param_grid,
        cv=5,
        scoring="accuracy",
        n_jobs=-1,
        verbose=1,
        **grid_search_kwargs,
    )

    start_time = time.time()
    logging.info("Starting GridSearchCV to find the best Random Forest model.")

    try:
        # Fitting GridSearchCV to the training data
        grid_search.fit(X_train, y_train)

        # Extracting the best estimator (model)
        best_model = grid_search.best_estimator_

        # Making predictions with the best model on the test set
        y_pred = best_model.predict(X_test)

        # Calculating the accuracy of the best model
        accuracy = accuracy_score(y_test, y_pred)

        end_time = time.time()
        elapsed_time = end_time - start_time

        logging.info(f"GridSearchCV completed in {elapsed_time:.2f} seconds.")
        logging.info(f"Best Model's Accuracy: {accuracy * 100:.2f}%")
        logging.info(f"Best Parameters: {grid_search.best_params_}")

        # Returning the best model and its accuracy
        return best_model, accuracy

    except Exception as e:
        logging.error(f"An error occurred during GridSearchCV: {e}")
        return None, None


# Defining the parameter grid for Random Forest
rf_param_grid = {
    "n_estimators": [10, 50, 100, 200],
    "max_depth": [None, 10, 20, 30],
    "min_samples_split": [2, 5, 10],
}

# Example usage with additional GridSearchCV parameters
best_rf_model, best_rf_accuracy = build_best_random_forest_model(
    X_train_scaled,
    X_test_scaled,
    y_train,
    y_test,
    rf_param_grid,
    return_train_score=True,
)

if best_rf_model is not None:
    dump(best_rf_model, "best_random_forest_model.joblib")
    # You can also read the model directly if available
    # best_rf_model = load("best_random_forest_model.joblib")

NameError: name 'X_train_scaled' is not defined

### Neural Network Methods

In [None]:
def create_model(hidden_nodes=1, learning_rate=0.001):
    """
    Creates and compiles a Keras Sequential model.

    Args:
        hidden_nodes (int): Number of nodes in the hidden layer.
        learning_rate (float): Learning rate for the optimizer.

    Returns:
        model (Sequential): Compiled Keras model.
    """
    # Define the model
    model = Sequential()
    model.add(Dense(hidden_nodes, input_dim=X_train_scaled.shape[1], activation="relu"))
    model.add(Dense(1, activation="sigmoid"))  # Output layer for binary classification

    # Compile the model
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])
    return model


def build_best_neural_network_model(
    X_train_scaled, X_test_scaled, y_train, y_test, param_grid, **grid_search_kwargs
):
    """
    Builds and returns the best neural network model using GridSearchCV.

    Args:
        X_train_scaled (pd.DataFrame): Scaled training features.
        X_test_scaled (pd.DataFrame): Scaled test features.
        y_train (pd.Series): Training labels.
        y_test (pd.Series): Test labels.
        param_grid (dict): Parameter grid for GridSearchCV.
        **grid_search_kwargs: Additional keyword arguments for GridSearchCV.

    Returns:
        best_model (KerasClassifier): The best neural network model.
        accuracy (float): Accuracy of the best model on the test set.
    """
    # Wrapping the Keras model so it can be used by scikit-learn
    model = KerasClassifier(build_fn=create_model, verbose=0)

    # Setting up GridSearchCV to find the best model
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        cv=5,
        scoring="accuracy",
        n_jobs=-1,
        verbose=1,
        **grid_search_kwargs,
    )

    start_time = time.time()
    logging.info("Starting GridSearchCV to find the best neural network model.")

    try:
        # Fitting GridSearchCV to the training data
        grid_search.fit(X_train_scaled, y_train)

        # Extracting the best estimator (model)
        best_model = grid_search.best_estimator_

        # Making predictions with the best model on the test set
        y_pred = best_model.predict(X_test_scaled)
        y_pred = np.round(y_pred).astype(int)  # Convert probabilities to binary output

        # Calculating the accuracy of the best model
        accuracy = accuracy_score(y_test, y_pred)

        end_time = time.time()
        elapsed_time = end_time - start_time

        logging.info(f"GridSearchCV completed in {elapsed_time:.2f} seconds.")
        logging.info(f"Best Model's Accuracy: {accuracy * 100:.2f}%")
        logging.info(f"Best Parameters: {grid_search.best_params_}")

        # Returning the best model and its accuracy
        return best_model, accuracy

    except Exception as e:
        logging.error(f"An error occurred during GridSearchCV: {e}")
        return None, None


# Defining the parameter grid for the neural network
nn_param_grid = {
    "hidden_nodes": [10, 50, 100],
    "learning_rate": [0.001, 0.01, 0.1],
    "epochs": [50],
    "batch_size": [32],
}

# Building and evaluating the neural network model
best_nn_model, best_nn_accuracy = build_best_neural_network_model(
    X_train_scaled,
    X_test_scaled,
    y_train,
    y_test,
    nn_param_grid,
    return_train_score=True,
)

if best_nn_model is not None:
    dump(best_nn_model, "best_neural_network_model.joblib")
    # You can also read the model directly if available
    # best_nn_model = load("best_neural_network_model.joblib")

### Model Evaluation

In [None]:
def perform_sensitivity_analysis(
    models, X_scaled, feature_names, output_dir=".", file_format="png"
):
    """
    Perform sensitivity analysis on given models.

    Args:
        models (dict): Dictionary of trained models with their names as keys.
        X_scaled (np.ndarray): Scaled features array.
        feature_names (list): List of feature names.
        output_dir (str): Directory to save the sensitivity analysis plots.
        file_format (str): File format for saving plots.
    """
    # Number of points to evaluate for each feature
    num_points = 100

    # Results dictionary to store sensitivity data
    sensitivity_results = {}

    # Iterate through each model
    for model_name, model in models.items():
        logging.info(f"Analyzing model: {model_name}")
        sensitivity_results[model_name] = {}

        # Iterate through each feature
        for feature in feature_names:
            # Array to hold predictions
            predictions = []

            # Generate values across the range
            feature_index = feature_names.index(feature)
            min_val, max_val = np.min(X_scaled[:, feature_index]), np.max(
                X_scaled[:, feature_index]
            )
            values = np.linspace(min_val, max_val, num_points)

            # Modify one feature at a time, keeping others constant
            for val in values:
                X_temp = np.copy(X_scaled)
                X_temp[:, feature_index] = val
                try:
                    pred = model.predict(X_temp)
                    predictions.append(np.mean(pred))
                except Exception as e:
                    logging.error(
                        f"Error predicting with model {model_name} for feature {feature}: {e}"
                    )
                    predictions.append(np.nan)  # Use NaN to indicate prediction failure

            sensitivity_results[model_name][feature] = np.nanstd(
                predictions
            )  # Use nanstd to handle NaNs

    # Plotting the sensitivity results
    for model_name, sensitivities in sensitivity_results.items():
        plt.figure(figsize=(10, 6))
        plt.title(f"Sensitivity Analysis for {model_name}")
        plt.bar(range(len(sensitivities)), list(sensitivities.values()), align="center")
        plt.xticks(
            range(len(sensitivities)), list(sensitivities.keys()), rotation="vertical"
        )
        plt.ylabel("Standard Deviation of Predictions")

        file_path = f"{output_dir}/sensitivity_{model_name}.{file_format}"
        plt.savefig(file_path, dpi=300)
        plt.show()
        logging.info(f"Sensitivity plot saved to {file_path}")


# Example usage
models = {
    "Random Forest": best_rf_model,
    "Logistic Regression": best_lr_model,
    "Neural Network": best_nn_model,
}

feature_names = [
    "q1_timing",
    "q2_timing",
    "q3_timing",
    "qualifying_pos",
    "prev_year_pos",
]
perform_sensitivity_analysis(models, X_test_scaled, feature_names)

In [None]:
# Assuming final_data_required is your DataFrame
sns.pairplot(final_data_required, hue='position')
plt.savefig('pairplot.png', dpi=300)
plt.show()

### Visualizations

In [None]:

# Assuming X_train and y_train are already defined
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train_scaled, y_train)

# Getting feature importances
feature_importances = rf.feature_importances_

# Converting to a DataFrame for easier plotting
importances_df = pd.DataFrame({'feature': X_train.columns, 'importance': feature_importances})

# Sorting by importance
importances_df = importances_df.sort_values('importance', ascending=False)

# Plotting
plt.figure(figsize=(10, 6))
sns.barplot(x='importance', y='feature', data=importances_df)
plt.title('Feature Importance')
plt.xlabel('Importance')
plt.ylabel('Feature')
plt.show()

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=347d2df2-e1c6-4f3b-913b-b3a22e587fee' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>