### Data generator from formula1.com

With this script you can push new results posted in the formula 1 official page to the database. This script will be run as soon as possible after each race, so don't worry about reporting this. It is much more important to complete the data that does not come automatically from scraping!

The results will be saved in csv files. You have to specify the season and the round of the event that you want to scrape. Results of the race, qualy, practices will be generated.

Subsequent manual analysis is required, since this scraped data can carry errors (for example, Nelson Piquet in the 2008/2009 season is assigned the identifier of his father, etc.)


In [116]:
import pandas as pd
from pandas import DataFrame
from urllib import request
import requests
import os
from unidecode import unidecode
import bs4 as bs
from typing import Literal
import numpy as np
import sqlite3
import json

In [117]:
# Path of the current data
INPUT_PATH = "./../data/csv/"

# Path to generate the new db files
OUTPUT_PATH = "generated/Data to append to the DB"

if not os.path.exists(OUTPUT_PATH):
    os.makedirs(OUTPUT_PATH)

API_PATH = "http://localhost:3200/api"
DB_PATH = "./../data/db/test.db"

In [118]:
# Race to get -> Season + Round
SEASON = 2020
ROUND = 2

In [119]:
def makeSQLQuery(query: str):
    sqliteConnection = sqlite3.connect(DB_PATH)
    cursor = sqliteConnection.cursor()
    print("Connected to SQLite")

    cursor.execute(query)
    return cursor.fetchall()

In [120]:
def download_df(df: DataFrame | None, name: str):
    if df is not None:
        df.to_csv(f"{OUTPUT_PATH}/{name}.csv", index=False)

In [121]:
def loc_existing_columns(df: DataFrame, columns: list[str]):
    """Receives a dataframe an a list of columns, and returns the dataframe with the
    columns of the list and the columns that are not present filled by and empty string
    """

    invalid_cols = [column for column in columns if column not in df.columns]

    for col in invalid_cols:
        df[col] = ""

    valid_cols = [column for column in columns if column in df.columns]

    return df.loc[:, valid_cols]

In [122]:
def convert_to_milliseconds(text: str):
    """Receives a string in the format `HH:mm:ss.SSS` and return the converted milliseconds"""

    if not isinstance(text, str) and not np.isnan(float(text)):
        return round(float(text) * 1000)

    try:
        splitted = list(map(float, text.split(":")))
    except:
        return None

    result = splitted[-1]
    if len(splitted) >= 2:
        result += splitted[-2] * 60
    if len(splitted) >= 3:
        result += splitted[-3] * 3600

    return round(result * 1000)

In [123]:
from math import isnan

entrants_in_db = pd.read_csv(INPUT_PATH + "sessionEntrants.csv")

def find_entrant_id(driver_name: str, car_name: str = ""):
    driver_name = unidecode(driver_name).replace("-", " ")

    drivers_api_path = f"{API_PATH}/drivers?name={driver_name.replace(" ", "%20")}&birthBefore={SEASON - 10}-01-01&birthAfter={SEASON - 70}-01-01&include=id"

    driver_res = json.loads(request.urlopen(drivers_api_path).read())
    if(driver_res["totalElements"] == 0):
        driver_res:dict = json.loads(request.urlopen(
            drivers_api_path.replace(driver_name.replace(" ", "%20"), 
                                     driver_name.split(" ")[-1])).
                                     read())
        
    results = list(driver_res["data"])

    to_return = []

    for driver in results:
        possible_entrants = json.loads(
            request.urlopen(
                f"{API_PATH}/session-entrants?driverId={driver["id"]}&season={SEASON}&include=id")
                .read())
        
        for entrant in list(possible_entrants["data"]):
            to_return.append(entrant["id"])

    if(len(results) > 0):
        return "???".join(to_return)
    
    return "???"

In [124]:
def get_race_urls(season: int):
    year = str(season)
    race_urls: list[str] = []
    source = request.urlopen(
        f"https://www.formula1.com/en/results.html/{year}/races.html"
    ).read()

    soup = bs.BeautifulSoup(source, "lxml")

    for url in soup.find_all("a"):
        if (
            year in str(url.get("href"))
            and "race-result" in str(url.get("href"))
            and url.get("href") not in race_urls
        ):
            race_urls.append(url.get("href"))

    return race_urls

In [125]:
def get_race_data(
    race_url: str,
    data_to_get: Literal[
        "pit_stops", "Q", "fp1", "fp2", "fp3", "fp4", "wu", "R"
    ],
    season: int,
    round: int,
):
    last_path_slot: str

    if data_to_get == "pit_stops":
        last_path_slot = "/pit-stop-summary.html"
    elif data_to_get == "Q":
        last_path_slot = "/qualifying-0.html"
    elif data_to_get == "fp1":
        last_path_slot = "/practice-1.html"
    elif data_to_get == "fp2":
        last_path_slot = "/practice-2.html"
    elif data_to_get == "fp3":
        last_path_slot = "/practice-3.html"
    elif data_to_get == "fp4":
        last_path_slot = "/practice-4.html"
    elif data_to_get == "wu":
        last_path_slot = "/practice-0.html"
    elif data_to_get == "R":
        last_path_slot = "/race-result.html"

    source = request.urlopen(
        "https://www.formula1.com/"
        + "/".join(race_url.split("/")[:-1])
        + last_path_slot
    ).read()

    soup = bs.BeautifulSoup(source, "lxml")

    SIDENAV_LIST = soup.find("ul", {"class": "resultsarchive-side-nav"})

    if not SIDENAV_LIST:
        return None

    if data_to_get == "pit_stops" and not "Pit stop summary" in SIDENAV_LIST.text:
        return None

    if data_to_get == "wu" and not "Warm Up" in SIDENAV_LIST.text:
        return None

    if data_to_get == "R" and not "Race result" in SIDENAV_LIST.text:
        return None

    for i in range(1, 5):
        if data_to_get == f"fp{i}" and not f"Practice {i}" in SIDENAV_LIST.text:
            return None

    table = soup.find_all("table")[0]
    df = pd.read_html(str(table), flavor="bs4", header=[0])[0]

    # Get driver ID
    df["Driver"] = df["Driver"].apply(lambda x: " ".join(x.split(" ")[:-1]))
    df["driverId"] = df["Driver"].map(find_entrant_id)

    # Remove unnamed columns
    df = df.loc[:, ~df.columns.str.startswith("Unnamed: ")]

    df["season"] = season
    df["round"] = round

    df["eventId"] = f"{season}-{round}-{"R" if data_to_get == "pit_stops" else data_to_get.upper()}"

    return df

In [126]:
races_url = get_race_urls(SEASON)
race = races_url[ROUND - 1]

# Comment any of the following blocks to speed-up the proccess:
qualifying_df = get_race_data(race, "Q", SEASON, ROUND)
pit_stop_df = get_race_data(race, "pit_stops", SEASON, ROUND)
fp1_df = get_race_data(race, "fp1", SEASON, ROUND)
fp2_df = get_race_data(race, "fp2", SEASON, ROUND)
fp3_df = get_race_data(race, "fp3", SEASON, ROUND)
fp4_df = get_race_data(race, "fp4", SEASON, ROUND)
wu_df = get_race_data(race, "wu", SEASON, ROUND)
race_df = get_race_data(race, "R", SEASON, ROUND)

  df = pd.read_html(str(table), flavor="bs4", header=[0])[0]
  df = pd.read_html(str(table), flavor="bs4", header=[0])[0]
  df = pd.read_html(str(table), flavor="bs4", header=[0])[0]
  df = pd.read_html(str(table), flavor="bs4", header=[0])[0]
  df = pd.read_html(str(table), flavor="bs4", header=[0])[0]


In [127]:
# Check that all drivers has been found in our DB
for df in [qualifying_df, pit_stop_df, fp1_df, fp2_df, fp3_df]:
    if df is not None:
        DRIVERS_WITHOUT_ID = df[df["driverId"] == ""]
        if len(DRIVERS_WITHOUT_ID) > 0:
            raise Exception(
                "Can not find any id for this drivers: ",
                DRIVERS_WITHOUT_ID["Driver"].unique(),
            )

In [128]:
""" ---------------------------------------------
--------------- RACE RESULTS --------------------
--------------------------------------------- """

if race_df is not None:
    race_df["positionOrder"] = race_df.index + 1
    race_df["pointsCountForWDC"] = 1
    race_df["pointsGained"] = race_df["PTS"]

    race_df = loc_existing_columns(
        race_df,
        [
            "eventId",
            "driverId",
            "positionOrder",
            "Pos",
            "time",
            "gridPosition",
            "gridPenalty",
            "Time/Retired",
            "Laps",
            "PTS",
            "pointsCountForWDC",
            "pointsGained",
            "gap",
            "timePenalty",
            "reasonRetired",
        ],
    )

    race_df.rename(
        columns={"Laps": "laps", "PTS": "points", "Pos": "positionText"}, inplace=True
    )

    race_df["gridPosition"] = "COMPLETE_ME"
    race_df["gridPenalty"] = "COMPLETE_ME"

    race_df["gap"] = race_df.apply(
        lambda x: x["Time/Retired"]
        if x["Time/Retired"] != "DNF" and x["positionOrder"] != 1
        else "",
        axis=1,
    )

    race_df.at[0, "time"] = convert_to_milliseconds(race_df.at[0, "Time/Retired"])

    race_df["time"] = race_df.apply(
        lambda x: race_df.at[0, "time"]
        + convert_to_milliseconds(
            str(x["Time/Retired"]).replace("+", "").replace("s", "")
        )
        if "lap" not in x["Time/Retired"]
        and x["positionOrder"] != 1
        and convert_to_milliseconds(
            str(x["Time/Retired"]).replace("+", "").replace("s", "")
        )
        is not None
        else "",
        axis=1,
    )

    race_df.at[0, "time"] = convert_to_milliseconds(race_df.at[0, "Time/Retired"])

    race_df["positionText"] = race_df.apply(
        lambda x: "DNF" if x["positionText"] == "NC" else x["positionText"], axis=1
    )

    race_df["reasonRetired"] = race_df.apply(
        lambda x: "COMPLETE_ME" if x["positionText"] == "DNF" else "", axis=1
    )

    race_df.drop(columns=["Time/Retired"], inplace=True)

In [129]:
download_df(race_df, "raceResults")

In [130]:
""" ---------------------------------------------
---------------- QUALIFYING ---------------------
--------------------------------------------- """

if qualifying_df is not None:
    qualifying_df["positionOrder"] = qualifying_df.index + 1

    qualifying_df = loc_existing_columns(
        qualifying_df,
        [
            "eventId",
            "driverId",
            "positionOrder",
            "Pos",
            "Time",
            "Laps",
            "Q1",
            "Q2",
            "Q3",
        ],
    )

    # Convert the time text to milliseconds
    for col in ["Time", "Q1", "Q2", "Q3"]:
        qualifying_df[col] = qualifying_df[col].apply(
            lambda x: convert_to_milliseconds(x)
        )

    qualifying_df.rename(
        columns={
            "Laps": "laps",
            "Time": "time",
            "Pos": "positionText",
            "Q1": "q1Time",
            "Q2": "q2Time",
            "Q3": "q3Time",
        },
        inplace=True,
    )

    for col in [
        "time",
        "q1Time",
        "q2Time",
        "q3Time",
        "laps",
    ]:
        if col in qualifying_df.columns:
            qualifying_df[col] = np.floor(
                pd.to_numeric(qualifying_df[col], errors="ignore")
            ).astype("Int64")

In [131]:
download_df(qualifying_df, "qualifyingResults")

In [132]:
""" ---------------------------------------------
----------------- PIT STOPS ---------------------
--------------------------------------------- """

if pit_stop_df is not None:
    pit_stop_df = loc_existing_columns(
        pit_stop_df, ["eventId", "driverId", "Lap", "Time", "Time of day", "annotation"]
    )

    # Convert the time text to seconds
    pit_stop_df["Time"] = pit_stop_df["Time"].apply(
        lambda x: convert_to_milliseconds(x)
    )

    pit_stop_df.rename(
        columns={"Lap": "pitStopLap", "Time": "time", "Time of day": "timeOfDay"},
        inplace=True,
    )

In [133]:
download_df(pit_stop_df, "pitStops")

In [134]:
""" ---------------------------------------------
--------------- FREE PRACTISES ------------------
--------------------------------------------- """


def format_fp_df(df: DataFrame | None):
    if df is None:
        return df

    df["positionOrder"] = df.index + 1

    df = loc_existing_columns(
        df, ["eventId", "driverId", "positionOrder", "Pos", "Laps", "Time"]
    )

    # Convert the time text to seconds
    df["Time"] = df["Time"].apply(lambda x: convert_to_milliseconds(x))

    df.rename(
        columns={"Laps": "laps", "Time": "time", "Pos": "pos"},
        inplace=True,
    )

    return df

In [135]:
for i, df in enumerate([fp1_df, fp2_df, fp3_df, fp4_df]):
    download_df(format_fp_df(df), f"fp{i + 1}_results")

download_df(format_fp_df(wu_df), "warmingUpResults")