In [1]:
import fastf1
import pandas as pd
import numpy as np
import folium
import geopandas as gpd
import math
from shapely.geometry import LineString
from shapely.affinity import translate
import matplotlib.pyplot as plt
import contextily as cx
import imageio.v2 as imageio
import os

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
session_load = fastf1.get_session(2023, 'Spielberg', 'R')
session_load.load(telemetry=True, laps=True, weather=False)

monza_track = gpd.read_file("bacinger f1-circuits master circuits/at-1969.geojson") #This is monza
original_centroid = monza_track.geometry.centroid.iloc[0]

# zandvoort_track = gpd.read_file('bacinger f1-circuits master circuits/be-1925.geojson') #this is zandvoort
# original_centroid = zandvoort_track.geometry.centroid.iloc[0]

core           INFO 	Loading data for Austrian Grand Prix - Race [v3.3.5]
req            INFO 	No cached data found for session_info. Loading data...
_api           INFO 	Fetching session info data...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for driver_info. Loading data...
_api           INFO 	Fetching driver list...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for session_status_data. Loading data...
_api           INFO 	Fetching session status data...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for lap_count. Loading data...
_api           INFO 	Fetching lap count data...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for track_status_data. Loading data...
_api           INFO 	Fetching track status data...
req            INFO 	Data has been written to cache!
req            INFO 	No c

In [None]:
# raw_df = pd.read_csv('static/raw_df.csv')
# interpolated_df = pd.read_csv('static/interpolated_df.csv')

In [4]:
def get_laps(session):
    telemetry_data = dict()
    drivers = session.drivers
    laps = session.laps

    for driver in drivers:
        driver_telemetry = []
        try:
            driver_laps = laps.pick_driver(driver)
            for lap in driver_laps.iterlaps():
                telemetry = lap[1].get_telemetry()
                telemetry['LapNumber'] = lap[1]['LapNumber']
                driver_telemetry.append(telemetry)
            telemetry_data[driver] = pd.concat(driver_telemetry, ignore_index=True)
        except Exception as e:
            print(f"Error processing driver {driver}: {e}")
    
    modified_dfs = []

    for key, df in telemetry_data.items():
        df['driver'] = key
        modified_dfs.append(df)

    concatenated_df = pd.concat(telemetry_data)

    concatenated_df.reset_index(drop=True, inplace=True)
    concatenated_df['LapNumber'] = concatenated_df['LapNumber'].astype(int)
    concatenated_df = concatenated_df[(concatenated_df['Source'] == 'car') & (concatenated_df['Status'] == 'OnTrack')]
    
    return concatenated_df

In [5]:
def interpolate(df):
    df_interpolated = []
    for index,row in df.iterrows():
        df_interpolated.append([row['LapNumber'],row['driver'],row['X'],row['Y'],row['Speed'],row['RPM'],row['nGear'], row['Throttle'],row['Brake']])
        for i in range(5):
            df_interpolated.append([row['LapNumber'],row['driver'],np.nan,np.nan,np.nan,np.nan,row['nGear'],row['Throttle'],row['Brake']])

    df_interpolated = pd.DataFrame(df_interpolated,columns = ['LapNumber','driver','X','Y','Speed','RPM','nGear','Throttle','Brake'])
    df_interpolated['X'] = df_interpolated['X'].interpolate(method='linear')
    df_interpolated['Y'] = df_interpolated['Y'].interpolate(method='linear')
    df_interpolated['Speed'] = df_interpolated['Speed'].interpolate(method='linear')
    df_interpolated['RPM'] = df_interpolated['RPM'].interpolate(method='linear')

    return df_interpolated

In [6]:
def get_corners(session):
    circuit_info = session.get_circuit_info()
    corners_df = circuit_info.corners
    return corners_df

In [7]:
def filter_df_to_corner(data,corner,d_threshold):
    data['Distance_to_corner'] = np.sqrt((data['X'] - get_corners(session_load)['X'].iloc[corner]) ** 2 + (data['Y'] - get_corners(session_load)['Y'].iloc[corner]) ** 2)

    threshold_distance = d_threshold

    filtered_df = data[data['Distance_to_corner'] < threshold_distance]

    return filtered_df

In [8]:
def coordinate_shift(original_centroid, f1_api_coords):
    """This translates the original relative coordinates into longitude and latitude
    original_centroid is the centroid computed from the downloaded track data
    """
    centroid_lon, centroid_lat = (original_centroid.x, original_centroid.y)  


      
    # conversion factors - these are approximations, adjust as necessary  
    # 1 degree of latitude is approximately 111 km, and 1 degree of longitude is approximately 111 km multiplied by the cosine of the latitude  
    km_per_degree_lat = 1 / 111  
    km_per_degree_lon = 1 / (111 * math.cos(math.radians(centroid_lat)))  
    
    # your array of tuples  
    xy_coordinates = f1_api_coords
    
    # convert each tuple in the array  
    lonlat_coordinates = []  
    for y,x in xy_coordinates:  
        lon = centroid_lon + (x / 10000) * km_per_degree_lon  # assuming x, y are in meters  
        lat = centroid_lat + (y / 10000) * km_per_degree_lat  # assuming x, y are in meters  
        lonlat_coordinates.append((lon,lat))  
    


    relative_line = LineString(lonlat_coordinates)
    return relative_line



# def shift_centroid(relative_line,original_centroid):
#     """This shift the centroid computed"""
#     # Calculate the distance to translate in each direction  
#     # dx = original_centroid.x - relative_line.centroid.x  
#     # dy = original_centroid.y - relative_line.centroid.y  
#     dx = -0.004091249607403924
#     dy = -0.006398742570105753
#     # Shift the LineString  
#     shifted_line = translate(relative_line, xoff=dx, yoff=dy)  
#     return shifted_line

In [9]:
def calculate_dx_dy(original_centroid, df):
    coords = [[row['Y'],row['X']] for index,row in df.iterrows()]
    test = coordinate_shift(original_centroid,coords)
    dx_all = original_centroid.x - test.centroid.x
    dy_all = original_centroid.y - test.centroid.y

    return dx_all, dy_all

In [10]:
def shift_centroid_new(relative_line,dx, dy):
    # Shift the LineString  
    shifted_line = translate(relative_line, xoff=dx, yoff=dy)  
    return shifted_line

In [11]:
data = interpolate(get_laps(session_load))



In [12]:
def folium_plot(centroid,plot_data,df):
    kat = folium.Map(location=[original_centroid.y, original_centroid.x], zoom_start=14, tiles='Esri.WorldImagery', attr="Esri",max_zoom=19,maxNativeZoom = 19)
    drivers = set(plot_data['driver'])
    laps = set(plot_data['LapNumber'])
    dx,dy = calculate_dx_dy(centroid,df)

    for lap in laps:
        for driver in drivers:
            data_ = plot_data[(plot_data['driver'] == driver) & (plot_data['LapNumber'] == lap)]
            coords = [(row['Y'],row['X']) for index,row in data_.iterrows()]
            try:
                scaled_down = coordinate_shift(centroid,coords)
                shifted_line = shift_centroid_new(scaled_down,dx,dy)
            except Exception as e: 
                print(e)



            gdf_ = gpd.GeoDataFrame(geometry=[shifted_line], crs="EPSG:4326")    
            new_projected = gdf_.to_crs(epsg=32632)

            style = {'color': 'black', 'weight': 0.4}  # Adjust weight as needed

            folium.GeoJson(new_projected,style=style).add_to(kat)
    
    return kat

In [14]:
get_corners(session_load)



Unnamed: 0,X,Y,Number,Letter,Angle,Distance
0,-3063.956543,-2208.505859,1,,-115.779625,430.571432
1,-6314.694824,3218.050049,2,,32.816311,1051.450543
2,-8209.920898,5460.177246,3,,147.538568,1364.232381
3,-266.374207,4694.272461,4,,16.276141,2181.70924
4,-1473.040283,3462.345947,5,,-75.648821,2345.612064
5,-4632.144043,3322.6875,6,,-39.524988,2700.785928
6,-3272.008301,853.066467,7,,96.493801,3016.682239
7,-1618.793945,1948.821411,8,,108.035017,3205.153321
8,3669.864502,1852.277344,9,,51.291139,3754.087381
9,4201.174805,85.383705,10,,-9.155094,3942.319248


In [16]:
folium_plot(original_centroid,filter_df_to_corner(data,2,350),data)



In [None]:
def plot_all_drivers_for_lap(plot_data, lap,centroid,df,plot_type,dx,dy):
    gdfs = []

    # dx,dy = calculate_dx_dy(centroid,df)


    for driver in set(plot_data['driver']):
        data = plot_data[(plot_data['driver'] == driver) & (plot_data['LapNumber'] == lap)]
        coords = [(row['Y'], row['X']) for index, row in data.iterrows()]
        scaled_down = coordinate_shift(centroid, coords)
        shifted_line = shift_centroid_new(scaled_down,dx,dy)


        data['shifted_x'] = [x for x, y in shifted_line.coords]
        data['shifted_y'] = [y for x, y in shifted_line.coords]

        if plot_type == 'Trajectory':

            points = list(zip(data['shifted_x'], data['shifted_y']))
            line = LineString(points)
            data['geometry'] = line
            gdf = gpd.GeoDataFrame(data, geometry='geometry', crs="EPSG:4326")
        
        elif plot_type in ['Speed']:
            gdf = gpd.GeoDataFrame(data, geometry=gpd.points_from_xy(data['shifted_x'], data['shifted_y']), crs="EPSG:4326").reset_index(drop=True)



        df_wm = gdf.to_crs(epsg=3857)
        gdfs.append(df_wm)

    # Concatenate all GeoDataFrames
    all_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))

    if plot_type == 'Trajectory':
        ax = all_gdf.plot(figsize=(10,14),color = 'black',linewidth = 0.1, alpha = 1)
    
    elif plot_type in ['Speed']:
        ax = all_gdf.plot(column=plot_type, legend=True, figsize=(10, 14), cmap='OrRd', markersize=0.4, alpha=1)

    cx.add_basemap(ax, source=cx.providers.Esri.WorldImagery)
    plt.tight_layout()
    return plt

In [19]:
dx_calc,dy_calc = calculate_dx_dy(original_centroid,data)

In [None]:
for i in set(data['LapNumber']):
    #for j in set(get_corners(session_load).index):
    plot_all_drivers_for_lap(filter_df_to_corner(data,1,500),i,original_centroid,data,'Trajectory',dx_calc,dy_calc)
    plt.title(f'Lap {i} - Corner {j} - All Drivers')
    plt.savefig(f'img/Lap {i} - Corner {j}.png')

In [20]:
def plot_all_laps_all_drivers(plot_test, centroid,df,plot_type,dx,dy,count):
    # List to store GeoDataFrames for each lap and driver
    gdfs = []

    # dx,dy = calculate_dx_dy(centroid,df)


    for lap in set(plot_test['LapNumber']):
        for driver in set(plot_test['driver']):
            data = plot_test[(plot_test['driver'] == driver) & (plot_test['LapNumber'] == lap)]
            coords = [(row['Y'], row['X']) for index, row in data.iterrows()]
            scaled_down = coordinate_shift(centroid, coords)
            shifted_line = shift_centroid_new(scaled_down, dx,dy)

            data['shifted_x'] = [x for x, y in shifted_line.coords]
            data['shifted_y'] = [y for x, y in shifted_line.coords]

        if plot_type == 'Trajectory':

            points = list(zip(data['shifted_x'], data['shifted_y']))
            line = LineString(points)
            data['geometry'] = line
            gdf = gpd.GeoDataFrame(data, geometry='geometry', crs="EPSG:4326")
        
        elif plot_type in ['Speed']:
            gdf = gpd.GeoDataFrame(data, geometry=gpd.points_from_xy(data['shifted_x'], data['shifted_y']), crs="EPSG:4326").reset_index(drop=True)



        df_wm = gdf.to_crs(epsg=3857)
        gdfs.append(df_wm)


    # Concatenate all GeoDataFrames
    all_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))

    # Plot all laps and all drivers as points on the same plot
    # ax = all_gdf.plot(column='Speed', legend=True, figsize=(10, 14), cmap='OrRd', markersize=0.5, alpha=1)
    if plot_type == 'Trajectory':
        ax = all_gdf.plot(figsize=(10,14),color = 'black',linewidth = 0.1, alpha = 1)
    
    elif plot_type in ['Speed']:
        ax = all_gdf.plot(column=plot_type, legend=True, figsize=(10, 14), cmap='OrRd', markersize=0.4, alpha=1)

    cx.add_basemap(ax, source=cx.providers.Esri.WorldImagery)
    plt.title('All Laps - All Drivers')
    plt.tight_layout()
    plt.savefig(f'img/all/AllLaps_AllDrivers {count}.png')
    plt.close()

In [22]:
count = 0
for j in set(get_corners(session_load).index):
    count += 1
    plot_all_laps_all_drivers(filter_df_to_corner(data,j,1000),original_centroid,data,'Trajectory',dx_calc,dy_calc,count)



In [None]:
def natural_sort_key(s):
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]

def create_gif(image_folder, gif_name):
    images = []
    for filename in sorted(os.listdir(image_folder),key = natural_sort_key):
        if filename.endswith('.png') or filename.endswith('.jpg'):
            images.append(imageio.imread(os.path.join(image_folder, filename)))
    imageio.mimsave(gif_name, images, duration=500)  # Adjust duration as needed

In [None]:
image_folder = 'img'
gif_name = 'output.gif'

create_gif(image_folder, gif_name)