# Importing Data and Setup

In [145]:
import pandas as pd
import numpy as np
from IPython.display import display
import os
import requests
from bs4 import BeautifulSoup
from dateutil.relativedelta import *

In [146]:
# URLs for the datasets
url_races = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9862ff3a11e86ee66101f55ad00515eceb177052/raw_datafiles/races.csv'
url_circuits = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9be04d41381da51770aaa26f1a73bc780806e5b3/raw_datafiles/circuits.csv'
url_drivers = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/c74cb6be0e86191ba96d27be9aab2be024db1d31/raw_datafiles/drivers.csv'
url_results = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9be04d41381da51770aaa26f1a73bc780806e5b3/raw_datafiles/results.csv'
url_constructors = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9862ff3a11e86ee66101f55ad00515eceb177052/raw_datafiles/constructors.csv'
url_status = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9be04d41381da51770aaa26f1a73bc780806e5b3/raw_datafiles/status.csv'
url_driver_standings = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9be04d41381da51770aaa26f1a73bc780806e5b3/raw_datafiles/driver_standings.csv'
url_constructor_standings = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9be04d41381da51770aaa26f1a73bc780806e5b3/raw_datafiles/constructor_standings.csv'
url_qualifying = 'https://raw.githubusercontent.com/HeedfulMoss/ML-F1-Prediction-Project/9862ff3a11e86ee66101f55ad00515eceb177052/raw_datafiles/qualifying.csv'

In [147]:
races_df = pd.read_csv(url_races)
circuits_df = pd.read_csv(url_circuits)
drivers_df = pd.read_csv(url_drivers)
results_df = pd.read_csv(url_results)
constructors_df = pd.read_csv(url_constructors)
status_df = pd.read_csv(url_status)
driver_standings_df = pd.read_csv(url_driver_standings)
constructor_standings_df = pd.read_csv(url_constructor_standings)
qualifying_df = pd.read_csv(url_qualifying)

# Formatted Races Data preprocessing

In [148]:
# Keep only specified columns in races_df
races_df = races_df[['raceId', 'year', 'round', 'circuitId', 'name', 'date', 'time', 'url']]

# Merge races_df with circuits_df based on circuitId
races_df = pd.merge(races_df, circuits_df[['circuitId', 'lat', 'lng', 'country', 'circuitRef']], on='circuitId', how='left')

In [149]:
formatted_races_df = races_df

columns = ['year', 'round', 'circuitRef', 'lat', 'lng', 'country', 'date', 'url']

# Reorder the columns in the DataFrame
formatted_races_df = races_df[columns]

formatted_races_df.to_csv('formatted_races_df.csv', index=False)


# Formatted Results Data preprocessing

In [150]:
# Merge the two dataframes based on the 'raceId' column.
results_df = pd.merge(results_df, races_df[['raceId', 'year', 'round', 'circuitRef', 'url']], on='raceId', how='left')

# Merge results_df with drivers_df based on driverId
results_df = pd.merge(results_df, drivers_df[['driverId', 'driverRef', 'dob', 'nationality']], on='driverId', how='left')

# Merge results_df with constructors_df based on constructorId
results_df = pd.merge(results_df, constructors_df[['constructorId', 'constructorRef']], on='constructorId', how='left')

# Merge results_df with status_df based on statusId
results_df = pd.merge(results_df, status_df[['statusId', 'status']], on='statusId', how='left')

In [151]:
# prompt: from results_df print the year 2019, from column order: year, round, circuitRef, driverRef, dob, nationality, constructorRef, grid, milliseconds, status, points, positionOrder, and url
formatted_results_df = results_df

# Specify the desired column order
columns = ['year', 'round', 'circuitRef', 'driverRef', 'dob', 'nationality', 'constructorRef', 'grid', 'milliseconds', 'status', 'points', 'positionOrder','url']

# Reorder the columns in the DataFrame
formatted_results_df = results_df[columns].copy()

formatted_results_df['milliseconds'] = pd.to_numeric(formatted_results_df['milliseconds'], errors='coerce')


formatted_results_df.to_csv('formatted_results_df.csv', index=False)

# Formatted Driver Standings Data preprocessing

In [152]:
# Define a function to compute the derived columns
def compute_previous_driver_values(df):
    df = df.sort_values(by=['year', 'round'])  # Sort by season and round
    df['post_race_driver_points'] = 0.0  # Initialize driver_points
    df['post_race_driver_wins'] = 0.0  # Initialize driver_wins
    df['post_race_driver_position'] = 0.0  # Initialize driver_standings_pos

    # Group by season and driver to compute the values for previous rounds
    for (year, driver), group in df.groupby(['year', 'driverRef']):
        for idx in range(1, len(group)):
            prev_idx = group.index[idx - 1]
            current_idx = group.index[idx]

            # Assign the values from the previous round
            df.at[current_idx, 'post_race_driver_points'] = df.at[prev_idx, 'points']
            df.at[current_idx, 'post_race_driver_wins'] = df.at[prev_idx, 'wins']
            df.at[current_idx, 'post_race_driver_position'] = df.at[prev_idx, 'position']

    return df

# Merge driver_standings_df with drivers_df based on driverId
driver_standings_df = pd.merge(driver_standings_df , drivers_df[['driverId','driverRef']], on='driverId', how='left')

# Merge driver_standings_df with races_df based on raceId
driver_standings_df = pd.merge(driver_standings_df, races_df[['raceId','year','round']], on='raceId', how='left')

# Apply the function to compute the derived columns
driver_standings_df = compute_previous_driver_values(driver_standings_df)

In [153]:
# Specify the desired column order
formatted_driver_standings_df = driver_standings_df

columns = ['year', 'round', 'driverRef', 'points','wins', 'position', 'post_race_driver_points', 'post_race_driver_wins', 'post_race_driver_position']

# Reorder the columns in the DataFrame
formatted_driver_standings_df = formatted_driver_standings_df[columns]

# prompt: rename wins to driver_wins column from driver_standings
formatted_driver_standings_df = formatted_driver_standings_df.rename(columns={'points': 'driver_points'})
formatted_driver_standings_df = formatted_driver_standings_df.rename(columns={'wins': 'driver_wins'})
formatted_driver_standings_df = formatted_driver_standings_df.rename(columns={'position': 'driver_position'})

formatted_driver_standings_df.drop(['driver_points', 'driver_wins', 'driver_position'],axis = 1, inplace = True)

formatted_driver_standings_df.to_csv('formatted_driver_standings_df.csv', index=False)

# Formatted Constructor Standings Data preprocessing

In [154]:
# Define a function to compute the derived columns
def compute_previous_constructor_values(df):
    df = df.sort_values(by=['year', 'round'])  # Sort by season and round
    df['post_race_constructor_points'] = 0.0  # Initialize driver_points
    df['post_race_constructor_wins'] = 0.0  # Initialize driver_wins
    df['post_race_constructor_position'] = 0.0  # Initialize driver_standings_pos

    # Group by season and driver to compute the values for previous rounds
    for (year, driver), group in df.groupby(['year', 'constructorRef']):
        for idx in range(1, len(group)):
            prev_idx = group.index[idx - 1]
            current_idx = group.index[idx]

            # Assign the values from the previous round
            df.at[current_idx, 'post_race_constructor_points'] = df.at[prev_idx, 'points']
            df.at[current_idx, 'post_race_constructor_wins'] = df.at[prev_idx, 'wins']
            df.at[current_idx, 'post_race_constructor_position'] = df.at[prev_idx, 'position']

    return df

# Merge constructor_standings_df with constructors_df based on constructorId
constructor_standings_df = pd.merge(constructor_standings_df , constructors_df[['constructorId','constructorRef']], on='constructorId', how='left')

# Merge constructor_standings_df with races_df based on raceId
constructor_standings_df = pd.merge(constructor_standings_df , races_df[['raceId','year','round']], on='raceId', how='left')

# Apply the function to compute the derived columns
constructor_standings_df = compute_previous_constructor_values(constructor_standings_df)

In [155]:
# Specify the desired column order
formatted_constructor_standings_df = constructor_standings_df

columns = ['year', 'round', 'constructorRef', 'points','wins', 'position', 'post_race_constructor_points', 'post_race_constructor_wins', 'post_race_constructor_position']

# Reorder the columns in the DataFrame
formatted_constructor_standings_df = formatted_constructor_standings_df[columns]

# prompt: rename wins to driver_wins column from driver_standings
formatted_constructor_standings_df = formatted_constructor_standings_df.rename(columns={'points': 'constructor_points'})
formatted_constructor_standings_df = formatted_constructor_standings_df.rename(columns={'wins': 'constructor_wins'})
formatted_constructor_standings_df = formatted_constructor_standings_df.rename(columns={'position': 'constructor_position'})

formatted_constructor_standings_df.drop(['constructor_points', 'constructor_wins', 'constructor_position'],axis = 1, inplace = True)

formatted_constructor_standings_df.to_csv('formatted_constructor_standings_df.csv', index=False)

# Formatted Qualifying Data preprocessing

In [156]:
# Merge qualifying.csv with constructors.csv to add the constructorRef column
qualifying_df  = qualifying_df.merge(constructors_df[['constructorId', 'name']], on='constructorId', how='left')

# Merge the result with races.csv to add the year column
qualifying_df  = qualifying_df.merge(races_df[['raceId', 'year']], on='raceId', how='left')

# Merge the result with races.csv to add the year column
qualifying_df  = qualifying_df.merge(races_df[['raceId', 'round']], on='raceId', how='left')

# Merge the result with races.csv to add the year column
qualifying_df  = qualifying_df.merge(drivers_df[['driverId','forename','surname']], on='driverId', how='left')

# Create the 'fullname' column by combining 'forename' and 'surname'
qualifying_df['fullname'] = qualifying_df['forename'] + ' ' + qualifying_df['surname']

qualifying_df[['q1', 'q2', 'q3']] = qualifying_df[['q1', 'q2', 'q3']].replace(r'\\N', np.nan, regex=True)

qualifying_df['q'] = qualifying_df['q3'].fillna(qualifying_df['q2']).fillna(qualifying_df['q1'])


In [157]:
formatted_qualifying_df = qualifying_df

formatted_qualifying_df = formatted_qualifying_df.rename(columns={'position': 'qualifying_position'})

columns = ['qualifying_position', 'fullname', 'name', 'q', 'year', 'round']

# Reorder the columns in the DataFrame
formatted_qualifying_df = formatted_qualifying_df[columns]

#print(updated_qualifying[['q']].isnull().sum())
formatted_qualifying_df = formatted_qualifying_df.dropna(subset=['q'])

formatted_qualifying_df.rename(columns = {'qualifying_position': 'grid'}, inplace = True)

# Save the updated qualifying DataFrame to a new file
formatted_qualifying_df.to_csv('formatted_qualifying_df.csv' , index=False)

# Formatted Weather Data preprocessing

In [158]:
# Dictionary of weather conditions
weather_dict = {
    'weather_warm': ['soleggiato', 'clear', 'warm', 'hot', 'sunny', 'fine', 'mild', 'sereno', 'ensoleillé', 'chaud', 'doux', 'beau','heiß', 'sonnig', 'mild'],
    'weather_cold': ['cold', 'fresh', 'chilly', 'cool', 'froid', 'frais', 'glacial','kalt'],
    'weather_dry': ['dry', 'asciutto', 'sec','trocken'],
    'weather_wet': ['showers', 'wet', 'rain', 'pioggia', 'damp', 'thunderstorms', 'rainy', 'pluie', 'humide', 'averses','regen', 'nass', 'feucht', 'gewitter', 'schauer'],
    'weather_cloudy': ['overcast', 'nuvoloso', 'clouds', 'cloudy', 'grey', 'coperto', 'nuageux', 'gris','bewölkt', 'wolkig', 'trüb']
}

def analyze_weather(input_string):
    found_conditions = []
    input_string = input_string.lower()  # Convert input to lowercase for case-insensitive matching
    for condition, keywords in weather_dict.items():
        for keyword in keywords:
            if keyword in input_string:
                found_conditions.append(condition)
                break  # Move to the next condition once a match is found
    return found_conditions

def get_race_weather(url):
    try:
        # Fetch the page content
        response = requests.get(url)
        response.raise_for_status()  # Raise an exception if there's an error in the request
        # Parse the page with BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')

        # Wikipedia infobox tables have class 'infobox'
        infobox = soup.find('table', class_='infobox')
        if not infobox:
            return None

        # Find all rows in the infobox
        rows = infobox.find_all('tr')

        # Iterate through each row to find the "Weather" row
        for row in rows:
            header = row.find('th')
            if header and "Weather" in header.get_text():
                data_cell = row.find('td')
                if data_cell:
                    return data_cell.get_text(strip=True)
        return None
    except requests.RequestException as e:
        print(f"Error fetching URL {url}: {e}")
        return None

def get_italian_weather(url):
    try:
        # Use the Wikipedia API to find the Italian equivalent of the page
        api_url = f"https://en.wikipedia.org/w/api.php?action=query&prop=langlinks&format=json&lllang=it&titles={url.split('/')[-1]}"
        response = requests.get(api_url)
        response.raise_for_status()
        data = response.json()

        # Extract the Italian page title
        pages = data.get('query', {}).get('pages', {})
        for page_id, page_info in pages.items():
            if 'langlinks' in page_info:
                italian_title = page_info['langlinks'][0]['*']
                italian_url = f"https://it.wikipedia.org/wiki/{italian_title.replace(' ', '_')}"
                print(f"Translated Italian URL: {italian_url}")

                # Fetch and parse the Italian page
                response = requests.get(italian_url)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, 'html.parser')

                # Search for "Clima" instead of "Weather"
                infobox = soup.find('table', class_='infobox')
                if not infobox:
                    return None

                rows = infobox.find_all('tr')
                for row in rows:
                    header = row.find('th')
                    if header and "Clima" in header.get_text():
                        data_cell = row.find('td')
                        if data_cell:
                            return data_cell.get_text(strip=True)
                print(f"No 'Clima' section found on Italian page: {italian_url}")
                return None

        # Fall back if no translation is found
        print(f"No Italian translation found via API for URL: {url}")
        return None

    except requests.RequestException as e:
        print(f"Error fetching Italian URL {url}: {e}")
        return None

def get_french_weather(url):
    """
    Attempts to find French Wikipedia page via API, then parses
    any table row containing 'Météo' or 'Climat' in the <th> cell
    and returns the <td> text.
    """

    try:
        # 1) Use the Wikipedia API to find the French equivalent of the page
        api_url = (
            "https://en.wikipedia.org/w/api.php?"
            f"action=query&prop=langlinks&format=json&lllang=fr&titles={url.split('/')[-1]}"
        )
        response = requests.get(api_url)
        response.raise_for_status()
        data = response.json()

        # 2) Extract the French page title
        pages = data.get("query", {}).get("pages", {})
        for page_id, page_info in pages.items():
            if "langlinks" in page_info:
                french_title = page_info["langlinks"][0]["*"]
                french_url = f"https://fr.wikipedia.org/wiki/{french_title.replace(' ', '_')}"
                print(f"Translated French URL: {french_url}")

                # 3) Fetch and parse the French page
                fr_response = requests.get(french_url)
                fr_response.raise_for_status()
                soup = BeautifulSoup(fr_response.text, "html.parser")

                # 4) Search across all tables, since not all French GP pages use 'infobox' class
                tables = soup.find_all("table")
                for table in tables:
                    rows = table.find_all("tr")
                    for row in rows:
                        header = row.find("th")
                        if header:
                            header_text = header.get_text(strip=True)
                            # Check if "Météo" or "Climat" is in the <th> text
                            if "Météo" in header_text or "Climat" in header_text:
                                data_cell = row.find("td")
                                if data_cell:
                                    return data_cell.get_text(strip=True)

                print(f"No 'Météo' or 'Climat' section found on French page: {french_url}")
                return None

        # 5) Fall back if no translation is found
        print(f"No French translation found via API for URL: {url}")
        return None

    except requests.RequestException as e:
        print(f"Error fetching French URL {url}: {e}")
        return None

def get_german_weather(url):
    """
    Attempts to find the German Wikipedia page via API, then parses
    tables to locate a row whose first cell contains 'Wetter'.
    Returns the text in the second cell of that row.
    """
    try:
        # Step 1) Use the Wikipedia API to find the German equivalent of the page
        api_url = (
            "https://en.wikipedia.org/w/api.php?"
            f"action=query&prop=langlinks&format=json&lllang=de&titles={url.split('/')[-1]}"
        )
        response = requests.get(api_url)
        response.raise_for_status()
        data = response.json()

        # Step 2) Extract the German page title
        pages = data.get("query", {}).get("pages", {})
        for page_id, page_info in pages.items():
            if "langlinks" in page_info:
                german_title = page_info["langlinks"][0]["*"]
                german_url = f"https://de.wikipedia.org/wiki/{german_title.replace(' ', '_')}"
                print(f"Translated German URL: {german_url}")

                # Step 3) Fetch and parse the German page
                de_response = requests.get(german_url)
                de_response.raise_for_status()
                soup = BeautifulSoup(de_response.text, "html.parser")

                # Step 4) Search across all tables for any row whose first cell contains 'Wetter'
                tables = soup.find_all("table")
                for table in tables:
                    rows = table.find_all("tr")
                    for row in rows:
                        # Grab all cells in the row (both <th> and <td>)
                        cells = row.find_all(["th", "td"])
                        if len(cells) >= 2:
                            # Convert the first cell's text to lowercase and check if 'wetter' appears
                            first_cell_text = cells[0].get_text(strip=True).lower()
                            # If we see "wetter" (with or without the colon) in this cell
                            if "wetter" in first_cell_text:
                                # The second cell presumably contains the weather description
                                return cells[1].get_text(strip=True)

                print(f"No 'Wetter' section found on German page: {german_url}")
                return None

        # Step 5) Fall back if no German translation is found via the API
        print(f"No German translation found via API for URL: {url}")
        return None

    except requests.RequestException as e:
        print(f"Error fetching German URL {url}: {e}")
        return None


# -------------------
# Main script logic
# -------------------

# 1) Check if updated_weather_data.csv exists
csv_filename = 'updated_weather_data.csv'
if not os.path.exists(csv_filename):
    # If the file DOES NOT exist, use formatted_races_df to create weather_df
    weather_df = formatted_races_df.copy()

    # Initialize columns for each weather category with 0
    for col in weather_dict.keys():
        weather_df[col] = 0

    # Add a column to store raw weather information
    weather_df['weather'] = ''

else:
    # If it DOES exist, read it into weather_df
    weather_df = pd.read_csv(csv_filename)

# 2) Process each row in the DataFrame.
#    If we're continuing from a partial file, only fill rows where 'weather' is blank.
rows_to_process = weather_df[weather_df['weather'].isna() | weather_df['weather'].eq('')]

for index, row in rows_to_process.iterrows():
    print(f"Processing year: {row['year']}, round: {row['round']}")

    weather = get_race_weather(row['url'])
    if not weather:
        print(f"Weather information not found for URL: {row['url']}. Trying Italian version.")
        weather = get_italian_weather(row['url'])

    if not weather:
        print(f"Weather information not found for Italian URL. Trying French version.")
        weather = get_french_weather(row['url'])

    if not weather:
        print(f"Weather not found for French URL. Trying German version.")
        weather = get_german_weather(row['url'])

    if weather:
        print(f"Weather: {weather}")
        weather_df.at[index, 'weather'] = weather  # Store raw weather information
        weather_conditions = analyze_weather(weather)
        print("Detected weather conditions:", weather_conditions)
        print("")
        for condition in weather_conditions:
            weather_df.at[index, condition] = 1
    else:
        print(f"Weather information not found for all language versions of URL: {row['url']}")
        print("Weather data needs manual review.")

# 3) Save the updated DataFrame to a CSV file
weather_df.to_csv(csv_filename, index=False)
print(f"Data saved to {csv_filename}")


Data saved to updated_weather_data.csv


In [159]:
formatted_weather_df = weather_df

columns = ['year', 'round', 'circuitRef','weather','weather_warm','weather_cold','weather_dry','weather_wet','weather_cloudy']

# Reorder the columns in the DataFrame
formatted_weather_df = formatted_weather_df[columns]

formatted_weather_df.to_csv('formatted_weather_df', index=False)

# Final Dataframe preprocessing

In [160]:
final_df = (
    formatted_races_df
    .merge(formatted_weather_df, on=['year', 'round', 'circuitRef'], how='inner')
    .drop(['lat', 'lng', 'country', 'weather'], axis=1)
    .merge(formatted_results_df, on=['year', 'round', 'circuitRef', 'url'], how='inner')
    .drop(['url', 'points', 'status', 'milliseconds'], axis=1)
    .merge(formatted_driver_standings_df, on=['year', 'round', 'driverRef'], how='left')
    .merge(formatted_constructor_standings_df, on=['year', 'round', 'constructorRef'], how='left')
    .merge(formatted_qualifying_df, on=['year', 'round', 'grid'], how='inner')
    .drop(['fullname', 'name'], axis=1)
)

In [161]:
# calculate age of drivers
final_df['date'] = pd.to_datetime(final_df.date)
final_df['dob'] = pd.to_datetime(final_df.dob)
final_df['driver_age'] = final_df.apply(lambda x: relativedelta(x['date'], x['dob']).years, axis=1)
final_df.drop(['date', 'dob'], axis = 1, inplace = True)

In [162]:
# Fill/drop nulls
for col in ['post_race_driver_points', 'post_race_driver_wins', 'post_race_driver_position',
            'post_race_constructor_points', 'post_race_constructor_wins', 'post_race_constructor_position']:
    final_df[col] = final_df[col].fillna(0).astype(int)

# Drop remaining nulls
final_df.dropna(inplace=True)

In [163]:
# convert to boolean
for col in ['weather_warm', 'weather_cold','weather_dry', 'weather_wet', 'weather_cloudy']:
    final_df[col] = final_df[col].map(lambda x: bool(x))

In [164]:
# calculate difference in qualifying times
final_df['q'] = final_df.q.map(lambda x: 0 if str(x) == '00.000'
                             else(float(str(x).split(':')[1]) + (60 * float(str(x).split(':')[0])) if x != 0 else 0))
final_df = final_df[final_df['q'] != 0]
final_df.sort_values(['year', 'round', 'grid'], inplace = True)
final_df['qualifying_time_diff'] = final_df.groupby(['year', 'round']).q.diff()
final_df['q'] = final_df.groupby(['year', 'round']).qualifying_time_diff.cumsum().fillna(0)
final_df.drop('qualifying_time_diff', axis = 1, inplace = True)

In [165]:
# Convert categorical columns to dummy variables in final_df
final_df = pd.get_dummies(
    final_df,
    columns=['circuitRef', 'nationality', 'constructorRef'],
    dtype=int
)

# Drop columns based on sum thresholds
for col in final_df.columns:
    if 'nationality' in col and final_df[col].sum() < 140:
        final_df.drop(col, axis=1, inplace=True)
    elif 'constructorRef' in col and final_df[col].sum() < 140:
        final_df.drop(col, axis=1, inplace=True)
    elif 'circuitRef' in col and final_df[col].sum() < 70:
        final_df.drop(col, axis=1, inplace=True)
    else:
        pass

In [118]:
final_df.to_csv('final_df.csv', index = False)