In [None]:
import pandas as pd
import numpy as np

In [None]:
raw_dfs: dict[str, pd.DataFrame] = dict()
raw_dfs["results"] = pd.read_parquet(".data_parquet/results.parquet")
raw_dfs["pit_stops"] = pd.read_parquet(".data_parquet/pit_stops.parquet")
raw_dfs["races"] = pd.read_parquet(".data_parquet/races.parquet")
raw_dfs["laps"] = pd.read_parquet(".data_parquet/lap_times.parquet")
raw_dfs["constructor_standings"] = pd.read_parquet(".data_parquet/constructor_standings.parquet")
raw_dfs["status"] = pd.read_parquet(".data_parquet/status.parquet")
raw_dfs["drivers"] = pd.read_parquet(".data_parquet/drivers.parquet")
raw_dfs["constructors"] = pd.read_parquet(".data_parquet/constructors.parquet")

In [None]:
races_in_order = raw_dfs["races"][["year", "round"]].sort_values(["year", "round"])
races_in_order["number"] = range(1, len(races_in_order) + 1)
races_in_order["raceId"] = races_in_order.index
races_in_order = races_in_order.set_index("number")
races_in_order

In [None]:
# Status used to differentiate DNFs from DNQs
exclude_status = ["Did not qualify", "Did not prequalify", "Not classified", "Withdrew"]
not_qualified_status = raw_dfs["status"].query("status in @exclude_status", engine="python").index
not_qualified_status

In [None]:
def calculate_difference_coefficient(
    raceId: int,
    verbose=False
):
    """
    Calculates a difference coefficient that will tell how big was the advantage
    the winner had over the previous drivers based on the time difference
    """
    try:
        millis = (
            raw_dfs["results"]
            .loc[(raceId, slice(None))]
            .sort_values("position")["milliseconds"]
            .dropna()
            .to_numpy()
        )
        if len(millis) < 2:
            raise ValueError("No data available")
        used_differences = np.minimum(3, millis.size)
        differences_each_driver = np.diff(millis[:used_differences])
        weights = np.arange(differences_each_driver.size, 0, -1) ** 2
        dot_product = np.dot(differences_each_driver, weights)
        sum_weights = weights.sum()
        difference_coefficient = dot_product / sum_weights
        if verbose:
            print(f"Race ID: {raceId}")
            print(f"Milliseconds: {millis}")
            print(f"Differences: {differences_each_driver}")
            print(f"Weights: {weights}")
            print(f"Dot product: {dot_product}")
            print(f"Sum weights: {sum_weights}")
            print(f"Difference coefficient: {difference_coefficient}")
        return difference_coefficient
    except Exception as excp:
        if verbose:
            print(excp)
        return np.NAN


# Vitórias folgadas, exemplo: Silverstone 2008
print(calculate_difference_coefficient(26, verbose=True))
# Vitórias apertadas, exemplo: EUA 2002
print(calculate_difference_coefficient(139, verbose=True))

In [None]:
def calculate_total_pit_stops(
    raceId: int,
    driverId: int,
    verbose=False
):
    """
    Calculates:
    - The total number of pit stops a driver made in a race
    - The difference of pit stops between driver and mean of all drivers
    """
    try:
        pit_stops_by_driver = (
            raw_dfs["pit_stops"]
            .loc[raceId]
            .groupby("driverId")
            .size()
        )
        pit_stops_winner = pit_stops_by_driver[driverId]
        mean_pit_stops = pit_stops_by_driver.mean()
        difference_from_mean = pit_stops_winner - mean_pit_stops
        if verbose:
            print(f"Race ID: {raceId}; Driver ID: {driverId}")
            print(f"Winner pit stops: {pit_stops_winner}")
            print(f"Mean pit stops: {mean_pit_stops}")
            print(f"Difference from mean: {difference_from_mean}")
        return pit_stops_winner, difference_from_mean
    except Exception as excp:
        if verbose:
            print(excp)
        return np.NAN, np.NAN

# Vitória com muitos pits, exemplo: Canadá 2011
print(calculate_total_pit_stops(847, 18, verbose=True))
# Vitória com poucos pits, exemplo: Espanha 2016
print(calculate_total_pit_stops(952, 830, verbose=True))

In [None]:
def calculate_constructor_performance_coefficient(
    raceId: int,
    driverId: int,
    verbose=False
):
    """
    Calculates the performance coefficient of the constructor of a driver based
    on the result of the last 5 races
    """
    try:
        constructorId = (
            raw_dfs["results"]
            .loc[(raceId, driverId)]["constructorId"]
            .to_numpy()
            [0]
        )
        # Get race number
        race_number = races_in_order.query("raceId == @raceId").index[0]
        # Get the last five races raceId
        previous_five_raceId = races_in_order.loc[race_number - 5: race_number - 1]["raceId"]
        # Get the results of the last 5 races, group wins by constructor
        previous_wins_by_constructor = (
            raw_dfs["results"]
            .loc[(previous_five_raceId, slice(None))]
            .groupby("constructorId")["position"]
            .apply(lambda x: (x == 1).sum())
        )
        # Get the ratio of wins by constructor of the driver
        previous_constructor_wins_ratio = (
            previous_wins_by_constructor.loc[constructorId] /
            previous_wins_by_constructor.sum()
        )
        if verbose:
            print(f"Race ID: {raceId}; Driver ID: {driverId}")
            print(f"Constructor ID of the winning driver: {constructorId}")
            print(f"Race number: {race_number}")
            print(f"Last 5 races raceId: {previous_five_raceId}")
            print(f"Previous wins by constructor in the last 5 races: {previous_wins_by_constructor}")
            print(f"Ratio of wins by constructor: {previous_constructor_wins_ratio}")
        return previous_constructor_wins_ratio
    except Exception as excp:
        if verbose:
            print(excp)
        return np.NAN


# Vitória com construtor em alta, exemplo: Russia 2016
print(calculate_constructor_performance_coefficient(951, 3, verbose=True))
# Vitória com construtor em baixa, exemplo: Hungria 2021
print(calculate_constructor_performance_coefficient(1062, 839, verbose=True))

In [None]:
def calculate_dnfs_in_race(
    raceId: int,
    verbose=False
):
    """
    Calculates the number of DNFs in a given race
    """
    try:
        race_results = raw_dfs["results"].loc[(raceId, slice(None))]
        race_results = race_results.query("statusId not in @not_qualified_status", engine="python")
        total_drivers = race_results.shape[0]
        dnf_count = race_results["position"].isna().sum()
        dnf_ratio = dnf_count / total_drivers
        if verbose:
            print(f"Race ID: {raceId}")
            print(f"Total drivers: {total_drivers}")
            print(f"DNF count: {dnf_count}")
            print(f"DNF ratio: {dnf_ratio}")
        return dnf_count, dnf_ratio
    except Exception as excp:
        if verbose:
            print(excp)
        return np.NAN, np.NAN


# Vitória com muitos DNFs, exemplo: Mônaco 1996
print(calculate_dnfs_in_race(229, verbose=True))
# Vitória com poucos DNFs, exemplo: Europa 2011
print(calculate_dnfs_in_race(848, verbose=True))

In [None]:
def calculate_position_distribution(
    raceId: int,
    driverId: int,
    verbose=False
):
    """
    Calculate:
    - The worst position a driver had in a given race.
    - Mean position across all race (if led all laps, mean=1.0)
    """
    try:
        this_driver_laps = raw_dfs["laps"].loc[(raceId, driverId)]
        worst_position = this_driver_laps["position"].max()
        mean_position = this_driver_laps["position"].mean()
        if verbose:
            print(f"Race ID: {raceId}; Driver ID: {driverId}")
            print(f"Worst position: {worst_position}")
            print(f"Mean position: {mean_position}")
        return worst_position, mean_position
    except Exception as excp:
        if verbose:
            print(excp)
        return np.NAN, np.NAN
    

# Vitória de ponta-a-ponta, exemplo: Japão 2012
print(calculate_position_distribution(874, 20, verbose=True))
# Vitória com corrida de recuperação, exemplo: Brasil 2012
print(calculate_position_distribution(879, 20, verbose=True))

In [None]:
winners_df = raw_dfs["results"].query("position == 1.0", engine="python")


def calculate_cluster_info(row):
    raceId = row.name[0]
    driverId = row.name[1]
    row["difference_coefficient"] = calculate_difference_coefficient(raceId)
    row["total_pit_stops"], row["pit_stop_diff_from_mean"] = calculate_total_pit_stops(
        raceId,
        driverId
    )
    row["constructor_performance_coefficient"] = calculate_constructor_performance_coefficient(
        raceId,
        driverId
    )
    row["dnf_count"], row["dnf_ratio"] = calculate_dnfs_in_race(raceId)
    row["worst_position"], row["mean_position"] = calculate_position_distribution(raceId, driverId)
    return row


winners_with_extra_info_df = winners_df.apply(calculate_cluster_info, axis=1)
winners_with_extra_info_df = winners_with_extra_info_df.apply(lambda a: pd.to_numeric(a, errors="coerce"))
df_calculated_rows = [
    "difference_coefficient",
    "total_pit_stops",
    "pit_stop_diff_from_mean",
    "constructor_performance_coefficient",
    "dnf_count",
    "dnf_ratio",
    "worst_position", "mean_position"
]

In [None]:
winners_with_extra_info_df

In [None]:
# change constructorId by constructor name
winners_with_extra_info_df = winners_with_extra_info_df.join(
    raw_dfs["constructors"][["name"]].rename(columns={"name": "constructor_name"}), on="constructorId"
)
# delete fastestLapTime and constructorId
winners_with_extra_info_df = winners_with_extra_info_df.drop(columns=["fastestLapTime", "constructorId"])
winners_with_extra_info_df

In [None]:
# winner driver name
raw_dfs["drivers"]["driver_name"] = raw_dfs["drivers"]["forename"] + " " + raw_dfs["drivers"]["surname"]
winners_with_extra_info_df = winners_with_extra_info_df.join(
    raw_dfs["drivers"][["driver_name"]], on="driverId"
)
winners_with_extra_info_df

In [None]:
# race name
raw_dfs["races"]["race_name"] = raw_dfs["races"]["year"].astype(str) + " " + raw_dfs["races"]["name"]
winners_with_extra_info_df = winners_with_extra_info_df.join(
    raw_dfs["races"][["race_name"]], on="raceId"
)
winners_with_extra_info_df

In [None]:
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import plotly.express as px

df_to_pca = winners_with_extra_info_df[df_calculated_rows].copy()
df_to_pca = df_to_pca.dropna()
scaler = StandardScaler()
data_scaled = scaler.fit_transform(df_to_pca.values)
pca = PCA(n_components=2)
pca_result = pca.fit_transform(data_scaled)
kmeans = KMeans(n_clusters=4)
kmeans.fit(pca_result)
df_to_pca["x"] = pca_result[:, 0]
df_to_pca["y"] = pca_result[:, 1]
df_to_pca["cluster"] = kmeans.labels_
raw_dfs["races"] = pd.read_parquet(".data_parquet/races.parquet")
df_to_pca = df_to_pca.join(raw_dfs["races"][['name', 'year']])
df_to_pca['name'] = df_to_pca['year'].astype(str) + ' ' + df_to_pca['name']
df_to_pca = df_to_pca.join(raw_dfs["drivers"][['forename', 'surname']])
df_to_pca['driver_name'] = df_to_pca['forename'] + ' ' + df_to_pca['surname']

df_to_pca["cluster"] = df_to_pca["cluster"].astype(str)
hover_data_dict = {}
hover_data_dict["name"] = True
hover_data_dict["driver_name"] = True
for column in df_to_pca.columns:
    hover_data_dict[column] = True
hover_data_dict["x"] = False
hover_data_dict["y"] = False
hover_data_dict["cluster"] = False
hover_data_dict["year"] = False
hover_data_dict["forename"] = False
hover_data_dict["surname"] = False
fig = px.scatter(df_to_pca, x="x", y="y", color="cluster", hover_data=hover_data_dict,
                 color_continuous_scale=px.colors.sequential.Burgyl)
fig.show()

df_to_pca["year"] = df_to_pca["year"].astype(str)
fig2 = px.scatter(df_to_pca, x="x", y="y", color="year", hover_data=hover_data_dict,
                  color_continuous_scale=px.colors.sequential.RdBu)
fig2.show()

In [None]:
from dash import Dash, dcc, html, Input, Output, dash_table
import plotly.express as px
import plotly.graph_objects as go
import json

available_parameters = {
    "grid": "Grid starting position",
    "difference_coefficient": "Difference coefficient",
    "total_pit_stops": "Total pit stops",
    "pit_stop_diff_from_mean": "Pit stop difference from mean",
    "constructor_performance_coefficient": "Constructor performance coefficient",
    "dnf_count": "DNF count",
    "dnf_ratio": "DNF ratio",
    "worst_position": "Worst position",
    "mean_position": "Mean position",
}

app = Dash(__name__)


type store_data = dict[
    "data_index": list[tuple[int, int]],
    "x": str,
    "y": str,
    "apply_pca": bool,
]

app.layout = html.Div([
    html.H4('Interactive scatter plot'),
    dcc.Graph(id="scatter-plot", style={
        'width': '100%',
        'height': '600px',
    }),
    html.P("Select used parameters"),
    dcc.Checklist(
        options=[{"label": available_parameters[param], "value": param} for param in available_parameters],
        value=[item for item in available_parameters.keys()],
        id="select_checklist"
    ),
    html.Div([
        dcc.Dropdown(id="dropdown-filter-key", style={'flex': 1}, options=[
            "Driver",
            "Constructor",
        ]),
        dcc.Dropdown(id="dropdown-filter-value", style={'flex': 2}),
    ], style={
        'display': 'flex',
        'flexDirection': 'row',
        'gap': 20,
    }),
    # Filter by year
    dcc.RangeSlider(
        id='year_slider',
        min=1950,
        max=2023,
        step=1,
        value=[1950, 2023],
        marks={str(year): str(year) for year in range(1950, 2023, 10)}
    ),
    dash_table.DataTable(
        id='selected_data_table',
        columns=[{"id": "race_name", "name": "Race"},
                 {"id": "driver_name", "name": "Driver"}],
        style_cell={
            'textAlign': 'center',
        },
    ),
    dcc.Store(id='data-store'),
])

@app.callback(
    Output("data-store", "data"),
    Input("select_checklist", "value"))
def update_available_data_index(select_checklist) -> store_data:
    filtered_idx = winners_with_extra_info_df[select_checklist].dropna().index
    data: store_data = {
        "data_index": filtered_idx.to_list(),
    }
    match len(select_checklist):
        case 0:
            return None
        case 1:
            data["x"] = select_checklist[0]
            data["y"] = "zero"
            data["apply_pca"] = False
            return data
        case 2:
            data["x"] = select_checklist[0]
            data["y"] = select_checklist[1]
            data["apply_pca"] = False
            return data
        case _:
            data["x"] = "x"
            data["y"] = "y"
            data["apply_pca"] = True
            return data


# Update filter value based on available data and selected choice
@app.callback(
    Output("dropdown-filter-value", "options"),
    Output("dropdown-filter-value", "value"),
    Input("data-store", "data"),
    Input("dropdown-filter-key", "value"),
    Input("year_slider", "value"))
def update_year_slider(data: store_data, selected_filter, year_slider):
    available_race_ids = raw_dfs["races"].query("year >= @year_slider[0] & year <= @year_slider[1]", engine="python").index.to_list()
    if data is None or selected_filter is None:
        return [], None
    tuples = list(filter(lambda tuple: tuple[0] in available_race_ids, data["data_index"]))
    idx = pd.MultiIndex.from_tuples(tuples)
    if selected_filter == "Driver":
        return winners_with_extra_info_df.loc[idx]["driver_name"].sort_values().unique(), None
    if selected_filter == "Constructor":
        return winners_with_extra_info_df.loc[idx]["constructor_name"].sort_values().unique(), None


# Update year slider based on available data
@app.callback(
    Output("year_slider", "min"),
    Output("year_slider", "max"),
    Output("year_slider", "marks"),
    Input("data-store", "data"))
def update_year_slider(data: store_data):
    idx = data["data_index"]
    races_id = pd.MultiIndex.from_tuples(idx).get_level_values(0)
    races = raw_dfs["races"].loc[races_id]
    min = races["year"].min()
    max = races["year"].max()
    step = (max + 1 - min) // 10
    marks = {str(value): str(value) for value in range(min, max + 1, step)}
    return min, max, marks


@app.callback(
    Output("scatter-plot", "figure"),
    Input("data-store", "data"),
    Input("select_checklist", "value"),
    Input("year_slider", "value"),
    Input("dropdown-filter-key", "value"),
    Input("dropdown-filter-value", "value"),
)
def update_figure(data: store_data, select_checklist, year_value, filter_key, filter_value):
    idx = pd.MultiIndex.from_tuples(data["data_index"])
    year_min, year_max = year_value
    showcase_df = winners_with_extra_info_df.loc[idx].copy()
    showcase_df["zero"] = 0.
    hover_data_dict = {
        "driver_name": True,
        "zero": False,
    }
    if data["apply_pca"]:
        pca_data = winners_with_extra_info_df.loc[idx, select_checklist].values
        pca_data = StandardScaler().fit_transform(pca_data)
        pca_model = PCA(n_components=2)
        pca_model.fit(pca_data)
        pca_result = pca_model.transform(pca_data)
        showcase_df["x"] = pca_result[:, 0]
        showcase_df["y"] = pca_result[:, 1]
        hover_data_dict["x"] = False
        hover_data_dict["y"] = False
    for item in select_checklist:
        hover_data_dict[item] = True
    # get index of the races in the year range
    races_ids_in_year_range = raw_dfs["races"].query("year >= @year_min & year <= @year_max").index.to_list()
    showcase_df["in_year_range"] = showcase_df.apply(lambda row: True if row.name[0] in races_ids_in_year_range else False, axis=1)
    if filter_key == "Driver" and filter_value is not None:
        showcase_df["driver_selected"] = showcase_df["driver_name"] == filter_value
        showcase_df["selected"] = showcase_df["driver_selected"] & showcase_df["in_year_range"]
    elif filter_key == "Constructor" and filter_value is not None:
        showcase_df["constructor_selected"] = showcase_df["constructor_name"] == filter_value
        showcase_df["selected"] = showcase_df["constructor_selected"] & showcase_df["in_year_range"]
    else:
        showcase_df["selected"] = showcase_df["in_year_range"]
    fig = px.scatter(
        data_frame=showcase_df,
        x=data["x"],
        y=data["y"],
        hover_name="race_name",
        hover_data=hover_data_dict,
    )
    all_selected = showcase_df["selected"].all()
    if not all_selected:
        selected_x = showcase_df.query("selected")["x"]
        selected_y = showcase_df.query("selected")["y"]
        fig.add_trace(go.Scatter(
            x=selected_x,
            y=selected_y,
            mode="markers",
            hoverinfo="skip",
            opacity=1,
            marker=dict(
                color="green",
                size=10,
                symbol="diamond",
            ),
        ))
        fig.data[1].name = "Selected"
    return fig


@app.callback(
    Output("selected_data_table", "data"),
    Input("scatter-plot", "selectedData"))
def update_table(selectedData):
    if selectedData is not None:
        # If any point has more than one value, that means we have selected some extra trace
        # Therefore, we will filter every point that has only one value
        has_trace = False
        for point in selectedData["points"]:
            if point["curveNumber"] == 1:
                has_trace = True
                break
        # Select all x, y points for curveNumber 1
        if has_trace:
            x_traces = [point["x"] for point in selectedData["points"] if point["curveNumber"] == 1]
            y_traces = [point["y"] for point in selectedData["points"] if point["curveNumber"] == 1]
            selected_names = [point["hovertext"] for point in selectedData["points"] if point["curveNumber"] == 0 and point["x"] in x_traces and point["y"] in y_traces]
        else:
            selected_names = [point["hovertext"] for point in selectedData["points"]]
        data_selected_full = winners_with_extra_info_df.query("race_name in @selected_names", engine="python").copy()
        data_selected_full["race_number"] = (
            races_in_order
            .copy()
            .reset_index()
            .set_index("raceId")
            .loc[data_selected_full.index.get_level_values(0)]["number"].to_list()
        )
        data_selected_full = data_selected_full.sort_values("race_number")
        race_names = data_selected_full["race_name"].to_list()
        driver_names = data_selected_full["driver_name"].to_list()
    else:
        race_names = []
        driver_names = []
    return [{"race_name": race_names[i], "driver_name": driver_names[i]} for i in range(len(race_names))]

app.run(jupyter_mode="external", debug=True)