In [None]:
import requests
from dotenv import load_dotenv
import os
import mysql.connector
import json
import csv
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
import constants
import driver
import importlib
import Track_sectors
import random
import math
import pprint
from collections import OrderedDict
import tkinter as tk
import ast
%gui tk

In [344]:
importlib.reload(driver)
importlib.reload(constants)
importlib.reload(Track_sectors)
from driver import Driver
from constants import DB_DRIVER_NAMES, YEARS, TYRE_PARAMS, DEFAULT_BASE_LAP, DRIVER_TYRE_CONSTANTS, DRIVER_CONSTANTS, PREV_RACE_RESULTS
from Track_sectors import TRACK_SECTORS

In [345]:
quali_df = pd.read_csv('all_quali_data.csv')
race_df = pd.read_csv('all_race_data.csv')
compounds = ["soft", "medium", "hard"]

In [346]:
#This method returns the average pitstop loss for a given race. At the moment it is just the Dutch GP
#More will be added later
def get_avg_pit_loss():
    return float(21.355)

In [347]:
def find_driver_list(race):
    driver_list = []
    for driver_name in DB_DRIVER_NAMES:
        if driver_name in PREV_RACE_RESULTS[race]:
            driver_list.append(driver_name)
    return driver_list



In [348]:
def get_base_time(race, year, driver):
    base_time = (quali_df[(quali_df.year == year) & (quali_df.race_name == f"{race}") & (quali_df.driver == f"{driver}")])
    if base_time.empty:
        return DEFAULT_BASE_LAP[f"{race}"]
    return float(base_time.iloc[0, 3])

In [349]:
def get_next_compound(current_compound, num_laps, lap_number):
    laps_remaining = num_laps - lap_number
    if current_compound == "soft":
        if laps_remaining > 25:
            next_compound = "hard"
        else:
            next_compound = "medium"
    elif current_compound == "medium":
        if laps_remaining < 13:
            next_compound = "soft"
        else:
            next_compound = "hard"
    else:
        if laps_remaining < 13:
            next_compound = "soft"
        else:
            next_compound = "medium"
    return next_compound

In [350]:
def simulate_sector_times(driver, race):
    lap_time, sector_times = driver.simulate_sector_times(driver.stint_lap, race)
    return lap_time, sector_times

In [351]:
def is_DRS_available(gap_to_in_front, lap):
    if lap > 3 and gap_to_in_front < 1.0:
        return True
    else:
        return False

In [352]:
#Speed is measured in m/s
def get_DRS_affect(race, sector_number, sector_time):
    sector = TRACK_SECTORS[race][sector_number]
    distance = sector['end'] - sector['start']
    original_speed = distance / sector_time
    drs_speed = original_speed + 3.6
    new_sector_time = distance / drs_speed
    return new_sector_time

In [353]:
def get_slipstream_affect(race, sector_number, sector_time, interval):
    sector = TRACK_SECTORS[race][sector_number]
    time_loss = 0 
    sector_distance = sector["end"] - sector["start"]
    if 0 < interval < 1.2:
        time_gain = (-0.24 * interval + 0.3) * (sector_distance / 750)
        return sector_time - time_gain
    else:
        return sector_time

In [354]:
def get_following_cornering_affect(race, sector_number, sector_time, interval):
    if interval < 0:
        return sector_time
    sector = TRACK_SECTORS[race][sector_number]
    b = 0.0015
    sector_speed = sector["speed"]
    sector_distance = sector["end"] - sector["start"]
    gap_effect = (1 / (interval + 0.0001))
    if sector_speed == "fast":
        sector_speed_effect = 0.25
    elif sector_speed == "medium":
        sector_speed_effect = 0.15
    else:
        sector_speed_effect = 0.1 
    sector_distance_effect = math.log(b * sector_distance + 1)
    time_loss = gap_effect * sector_speed_effect * sector_distance_effect
    if time_loss > 0.6:
        time_loss = 0.6
    sector_time = float(sector_time) + time_loss
    return sector_time

In [355]:
def increment_laps(drivers):
    for driver in drivers:
        driver.current_lap += 1
        driver.stint_lap += 1

In [356]:
def order_drivers_by_race_time(drivers):
    drivers.sort(key=lambda d: d.race_time)
    for idx, driver in enumerate(drivers):
        driver.position = idx + 1
    return drivers

In [357]:
def order_drivers_by_position(drivers):
    drivers.sort(key=lambda d: d.position)
    for idx, driver in enumerate(drivers):
        driver.position = idx + 1
    return drivers

In [358]:
def calculate_gaps_to_in_front(drivers):
    for i in range(1, len(drivers)):
        interval = drivers[i].race_time - drivers[i-1].race_time
        drivers[i].gap_to_in_front = interval

In [359]:
def get_tyre_traction_level(compound):
    if compound == "soft":
        return 1.0
    elif compound == "medium":
        return 0.9
    else:
        return 0.8

In [360]:
def calculate_starting_sector(driver):
    #This is the average acceleratin of an f1 car
    base_acceleration = 10.31630769 
    driver_skill = DRIVER_CONSTANTS[driver.driver_id]['starting']
    #If the random number is higher than the drivers skill then give them a bad start
    if (random.uniform(0, 1.05) > driver_skill):
        random_multiplier = random.uniform(0.95, 0.99)
        #print(f"Driver {driver.driver_id} has had a bad start")
        base_acceleration *= random_multiplier
    tyre_traction = get_tyre_traction_level(driver.compound)
    varied_acceleration = base_acceleration * tyre_traction
    sector_time = (((driver.starting_distance * 2)/ varied_acceleration) ** 0.5)
    return sector_time

In [361]:
def use_best_strategy(best_strategy_dictionary, driver):
    starting_tyre = list(best_strategy_dictionary[driver].keys())[0]
    return starting_tyre

In [362]:
def initialise_starting_variables(race, year, best_strategy):
    driver_list = find_driver_list(race)
    drivers = []
    compounds = ["soft", "medium", "hard"]
    for driver_name in driver_list:
        if best_strategy == None:
            starting_compound = compounds[random.randint(0,2)]
        else:
            starting_compound = use_best_strategy(best_strategy, driver_name)


        drivers.append(Driver(driver_id=driver_name, base_lap_time=get_base_time(race, year, driver_name),
                                fuel_level=110, compound= starting_compound,
                                race_name = race))
    drivers.sort(key= lambda d: d.base_lap_time)
    for idx, driver in enumerate(drivers):
        driver.position = idx + 1
        driver.grid_slot = driver.position
        driver.starting_distance = driver.get_starting_distance(race)
        driver.starting_sector = calculate_starting_sector(driver)
        driver.compounds.append(driver.starting_compound)
    number_of_sectors = len(TRACK_SECTORS[race])
    return drivers, number_of_sectors

In [363]:
def get_overtake_probability(race, sector_number, driver1, driver2):
    sector_probability = TRACK_SECTORS[race][sector_number]["overtake_probability"]
    #Interval is how much ahead the driver is going to be
    interval = driver2.race_time - driver1.race_time
    interval_factor = min(interval / 0.4, 1)
    overtake_probability = sector_probability * interval_factor
    return overtake_probability   

In [364]:
def calculate_overtakes(drivers, race, sector_number, lap):
    for i in range(len(drivers) - 1, 0, -1):
        driver = drivers[i]
        if driver.position > 1:
            while driver.position > 1:
                #print(f"Checking for overtake by {driver.driver_id} in position {driver.position} for the list of drivers that is {len(drivers)} long. \n the car 2 cars in front is {drivers[driver.position-3].driver_id}")
                driver_in_front = drivers[driver.position - 2]
                if driver.race_time < driver_in_front.race_time:
                    overtaking_probability = get_overtake_probability(race, sector_number, driver, driver_in_front)
                    if random.random() > overtaking_probability:
                        #overtake has failed
                        failure_penalty = random.uniform(0.2, 0.5)
                        driver.race_time = driver_in_front.race_time + failure_penalty
                    else:
                        driver.position -= 1
                        driver.overtakes += 1
                        driver_in_front.position += 1
                        drivers.sort(key=lambda d: d.position)
                else:
                    break
        else:
            break
    drivers.sort(key=lambda d: d.position)
    return drivers

In [365]:
def calculate_pit_stops(drivers, race, lap, num_laps):
    if (num_laps - lap) > 10:
        for driver in drivers:
            if driver.pit_lap:
                #print(f"Driver {driver.driver_id} is pitting from the {driver.compound} to the {get_next_compound(driver.compound, num_laps, lap)} on lap {lap}")
                driver.compound = get_next_compound(driver.compound, num_laps, lap)
                driver.compounds.append(driver.compound)
                driver.race_time += get_avg_pit_loss()
                driver.current_lap_time += get_avg_pit_loss()
                driver.stint_lap = 0
                driver.pit_laps.append(lap)
    drivers.sort(key=lambda d: d.race_time)
    return drivers

In [366]:
def log_lap_data(drivers):
    for driver in drivers:
        driver.lap_times.append(driver.current_lap_time)

In [None]:
def simulate_race(race, year, num_laps, best_strategy):
    #Initialise the drivers in their starting order
    drivers, number_of_sectors = initialise_starting_variables(race, year, best_strategy)

    for lap in range(num_laps):
        isolated_sectors = {}
        #print(f"Simulating lap {lap} \n")
        for driver in drivers:
            driver.current_lap += 1
            driver.stint_lap += 1
            driver.current_lap_time = 0
            initial_lap_time, sector_times = simulate_sector_times(driver, race)
            isolated_sectors[driver.driver_id] = {
                "lap": lap,
                "sectors": sector_times,
                "initial_lap_time": initial_lap_time
            }

        for sector_number in range(number_of_sectors):
            #print(f"Simulating sector {sector_number} of lap {lap}")
            calculate_gaps_to_in_front(drivers)
            for driver in drivers:
                recalculated_sector_time = 0
                if (lap == 0 and sector_number == 0):
                    driver.current_lap_time += driver.starting_sector
                    driver.race_time += driver.starting_sector
                else:
                    #print(f"Driver {driver.driver_id} has gap to in front {driver.gap_to_in_front}")
                    current_sector_type = TRACK_SECTORS[race][sector_number]["type"]
                    isolated_sector_time = isolated_sectors[driver.driver_id]['sectors'][sector_number]

                    if current_sector_type == "straight":
                        drs_sector = TRACK_SECTORS[race][sector_number]["DRS"]
                        if drs_sector:
                            driver.drs = is_DRS_available(driver.gap_to_in_front, lap)
                            if driver.drs:
                                #print(f"DRS triggered for {driver.driver_id}")
                                recalculated_sector_time = get_DRS_affect(race, sector_number, isolated_sector_time)
                                driver.drs = False
                            else:
                                recalculated_sector_time = get_slipstream_affect(race, sector_number, isolated_sector_time, driver.gap_to_in_front)
                        else:
                            recalculated_sector_time = get_slipstream_affect(race, sector_number, isolated_sector_time, driver.gap_to_in_front)
                    else:
                        #print("Calculating cornering affect")
                        recalculated_sector_time = get_following_cornering_affect(race, sector_number, isolated_sector_time, driver.gap_to_in_front)
                    #print(f"Driver {driver.driver_id} has sector {sector_number} time of {recalculated_sector_time} after original sector time of {isolated_sector_time} \n")
                    driver.current_lap_time += recalculated_sector_time
                    driver.race_time += recalculated_sector_time
            drivers = calculate_overtakes(drivers, race, sector_number, lap)
        drivers = calculate_pit_stops(drivers, race, lap, num_laps)
        log_lap_data(drivers)
        #print("-" * 50)

    return drivers

In [368]:
# drivers = simulate_race("netherlands", 2024, 71, None)
# for driver in drivers:
#     print(driver.driver_id)

In [369]:
# for driver in drivers:
#     print(driver.driver_id)

In [370]:
def write_final_data_to_csv(drivers, simulation_number, file_path):
    if drivers:
        file_exists = os.path.exists(file_path) and os.path.getsize(file_path) > 0
        with open(f'{file_path}', 'a', newline='') as csvfile:
            fieldnames = ["race_number", "Driver", "starting_position", "finishing_position", "race_time", "strategy", "pit_laps", "overtakes", "number_of_laps_completed", "lap_times"]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            if not file_exists:
                writer.writeheader()
            for driver in drivers:
                writer.writerow({"race_number": simulation_number, "Driver": driver.driver_id, "starting_position": driver.grid_slot, "finishing_position": driver.position,
                                "race_time": driver.race_time, "strategy": driver.compounds, "pit_laps": driver.pit_laps, "overtakes": driver.overtakes, "number_of_laps_completed": len(driver.lap_times), "lap_times": driver.lap_times})

In [371]:
driver_list = find_driver_list("netherlands")
for i in range (1000):
    simulated_order = simulate_race("netherlands", 2024, 71, None)
    write_final_data_to_csv(simulated_order, i, 'preliminary_results.csv')

In [372]:
preliminary_results_df = pd.read_csv('preliminary_results.csv')
def create_results_dictionary():
    results = {}
    for driver in driver_list:
        results[driver] = {}
        for compound in compounds:
            results[driver][compound] = []

    for simulation_number in preliminary_results_df['race_number'].unique():
        race_data = preliminary_results_df[preliminary_results_df['race_number']==simulation_number]
        for index, row in race_data.iterrows():
            driver_name = row['Driver']
            used_compounds = ast.literal_eval(row['strategy'])
            starting_compound = used_compounds[0]
            race_time = row['race_time']
            results[driver_name][starting_compound].append(race_time)
    return results

        

In [373]:
def calculate_best_strategy(results):
    average_times = {}
    best_average_times = {}
    sorted_best_average_times = {}
    for driver in driver_list:
        average_times[driver] = {}
        for compound in compounds:
            average_time = sum(results[driver][compound]) / len(results[driver][compound])
            average_times[driver][compound] = average_time
    for driver in average_times:
        best_average_times[driver] = {}
        best_strategy = min(average_times[driver], key=average_times[driver].get)
        best_average_times[driver][best_strategy] = average_times[driver][best_strategy]
        sorted_best_average_times = OrderedDict(sorted(best_average_times.items(), key=lambda x: list(x[1].values())[0]))
    return sorted_best_average_times

In [374]:
results = create_results_dictionary()

In [375]:
best_strategy_dictionary = calculate_best_strategy(results)

In [376]:
for i in range(20):
    optimized_simulated_drivers = simulate_race("netherlands", 2024, 71, best_strategy_dictionary)
    write_final_data_to_csv(optimized_simulated_drivers, i, 'final_results.csv')