Load the Formula 1 Dataset

In [1]:
# Install all necessary libraries
%pip install pandas torch scikit-learn transformers faiss-cpu tqdm kagglehub

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.1
[notice] To update, run: C:\Users\mroja\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [2]:
import kagglehub
from kagglehub import KaggleDatasetAdapter
import pandas as pd

# Load all necessary files into separate DataFrames
races = kagglehub.load_dataset(
    KaggleDatasetAdapter.PANDAS,
    "rohanrao/formula-1-world-championship-1950-2020",
    "races.csv"
)
drivers = kagglehub.load_dataset(
    KaggleDatasetAdapter.PANDAS,
    "rohanrao/formula-1-world-championship-1950-2020",
    "drivers.csv"
)
results = kagglehub.load_dataset(
    KaggleDatasetAdapter.PANDAS,
    "rohanrao/formula-1-world-championship-1950-2020",
    "results.csv"
)
constructors = kagglehub.load_dataset(
    KaggleDatasetAdapter.PANDAS,
    "rohanrao/formula-1-world-championship-1950-2020",
    "constructors.csv"
)

# Now you can access the data directly:
print("Loaded:", len(races), "races,", len(drivers), "drivers,", len(results), "results.")

# If you want to combine them into a single DataFrame:
# Create a 'table' column for each DataFrame
races['table'] = 'races'
drivers['table'] = 'drivers'
results['table'] = 'results'
constructors['table'] = 'constructors'

# Concatenate the DataFrames
f1_df = pd.concat([races, drivers, results, constructors], ignore_index=True)

  races = kagglehub.load_dataset(
  drivers = kagglehub.load_dataset(
  drivers = kagglehub.load_dataset(
  results = kagglehub.load_dataset(
  results = kagglehub.load_dataset(
  constructors = kagglehub.load_dataset(
  constructors = kagglehub.load_dataset(


Loaded: 1125 races, 861 drivers, 26759 results.


In [3]:
# Assuming your new DataFrame is called 'f1_df'
import pandas as pd
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from sklearn.model_selection import train_test_split
import faiss
from tqdm import tqdm

# No need to load from CSV anymore
# races = pd.read_csv('races.csv')
# drivers = pd.read_csv('drivers.csv')
# results = pd.read_csv('results.csv')
# constructors = pd.read_csv('constructors.csv')

# Access data directly from the new DataFrame
races = f1_df[f1_df['table'] == 'races']  # Assuming 'table' column identifies data type
drivers = f1_df[f1_df['table'] == 'drivers']
results = f1_df[f1_df['table'] == 'results']
constructors = f1_df[f1_df['table'] == 'constructors']

print("Loaded:", len(races), "races,", len(drivers), "drivers,", len(results), "results.")

Loaded: 1125 races, 861 drivers, 26759 results.


Build the F1 Knowledge Base

In [4]:
f1_data = results.merge(races, on='raceId', suffixes=('_results', '_races'))
f1_data = f1_data.merge(drivers, left_on='driverId_results', right_on='driverId', suffixes=('_f1data', '_drivers'))
f1_data = f1_data.merge(constructors, left_on='constructorId_results', right_on='constructorId', suffixes=('_f1data', '_constructors'))

# Only winning results
winners = f1_data[f1_data['positionOrder_results'] == 1].copy()

# Inspect columns to determine correct driver and constructor column names
print(winners.columns.tolist())

# Use the correct columns for driver names (update as needed after inspecting columns)
winners.loc[:, 'fact'] = winners.apply(
    lambda row: f"In {row['year_races']:.0f}, {row['forename_f1data']} {row['surname_f1data']} won the {row['name_races']} driving for {row['name_constructors']}.",
    axis=1
)
f1_facts = winners[['fact']].reset_index(drop=True)

print("Sample Fact:", f1_facts.iloc[0]['fact'])

['raceId_f1data', 'year_results', 'round_results', 'circuitId_results', 'name_results', 'date_results', 'time_results', 'url_results', 'fp1_date_results', 'fp1_time_results', 'fp2_date_results', 'fp2_time_results', 'fp3_date_results', 'fp3_time_results', 'quali_date_results', 'quali_time_results', 'sprint_date_results', 'sprint_time_results', 'table_results', 'driverId_results', 'driverRef_results', 'number_results', 'code_results', 'forename_results', 'surname_results', 'dob_results', 'nationality_results', 'resultId_results', 'constructorId_results', 'grid_results', 'position_results', 'positionText_results', 'positionOrder_results', 'points_results', 'laps_results', 'milliseconds_results', 'fastestLap_results', 'rank_results', 'fastestLapTime_results', 'fastestLapSpeed_results', 'statusId_results', 'constructorRef_results', 'year_races', 'round_races', 'circuitId_races', 'name_races', 'date_races', 'time_races', 'url_races', 'fp1_date_races', 'fp1_time_races', 'fp2_date_races', 'fp2

Embed the Knowledge Base

In [5]:
# Load model
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
tokenizer = AutoTokenizer.from_pretrained(model_name)
bert = AutoModel.from_pretrained(model_name)

# Embedding function
def compute_embeddings(texts, batch_size=32):
    all_embeddings = []
    for i in tqdm(range(0, len(texts), batch_size)):
        batch = texts[i:i+batch_size]
        inputs = tokenizer(batch, return_tensors='pt', padding=True, truncation=True)
        with torch.no_grad():
            outputs = bert(**inputs)
            embeddings = outputs.last_hidden_state[:, 0]
            embeddings = F.normalize(embeddings, dim=1)
            all_embeddings.append(embeddings.cpu())
    return torch.cat(all_embeddings, dim=0)

# Compute
fact_embeddings = compute_embeddings(f1_facts['fact'].tolist())
print("Embedded", fact_embeddings.shape[0], "facts.")


100%|██████████| 36/36 [00:03<00:00, 10.91it/s]

Embedded 1128 facts.





Define the FAISS Retriever

In [6]:
class F1FAISSRetriever:
    def __init__(self, facts, embeddings):
        self.facts = facts
        dim = embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dim)
        self.index.add(embeddings.numpy())

    def retrieve(self, query, k=3):
        # --- New: Pre-filter facts by year and circuit if present in query ---
        import re
        filtered_facts = self.facts
        # Try to extract a year
        year_match = re.search(r"\b(19|20)\d{2}\b", query)
        if year_match:
            year = year_match.group(0)
            filtered_facts = [fact for fact in filtered_facts if year in fact]
        # Try to extract a circuit/race name (e.g., Monaco, Silverstone)
        for keyword in ["Monaco", "Silverstone", "Abu Dhabi", "Bahrain", "Malaysia", "Las Vegas", "Qatar", "São Paulo", "Mexico City", "Austrian", "German", "South African"]:
            if keyword.lower() in query.lower():
                filtered_facts = [fact for fact in filtered_facts if keyword in fact]
        # If filtering results in no facts, fall back to all
        if not filtered_facts:
            filtered_facts = self.facts

        # Embed only filtered facts
        inputs = tokenizer(filtered_facts, return_tensors='pt', padding=True, truncation=True)
        with torch.no_grad():
            outputs = bert(**inputs)
            fact_embs = outputs.last_hidden_state[:, 0]
            fact_embs = F.normalize(fact_embs, dim=1)
        # Embed query
        q_inputs = tokenizer(query, return_tensors='pt', truncation=True, padding=True)
        with torch.no_grad():
            q_output = bert(**q_inputs)
            query_emb = q_output.last_hidden_state[:, 0]
            query_emb = F.normalize(query_emb, dim=1)
        # Compute similarity manually
        sims = torch.matmul(query_emb, fact_embs.T).squeeze(0)
        topk = torch.topk(sims, min(k, len(filtered_facts)))
        results = [(filtered_facts[idx], float(sims[idx])) for idx in topk.indices]
        return results

retriever = F1FAISSRetriever(f1_facts['fact'].tolist(), fact_embeddings)


Build the Conversational Chatbot

In [18]:
class F1Chatbot:
    def __init__(self, retriever):
        self.retriever = retriever
        self.chat_history = []
        # Extract all available variables/columns for flexible querying
        self.available_columns = list(winners.columns)
        self.available_columns_lower = [col.lower() for col in self.available_columns]

    def extract_drivers(self, query):
        # Simple extraction: look for known driver names in the query
        driver_names = winners['forename_f1data'] + ' ' + winners['surname_f1data']
        found = []
        for name in driver_names.unique():
            if isinstance(name, str) and name.lower() in query.lower():
                found.append(name)
        return list(set(found))

    def compare_drivers(self, driver1, driver2):
        # Aggregate wins for each driver
        d1_wins = winners[
            (winners['forename_f1data'] + ' ' + winners['surname_f1data'] == driver1)
        ]
        d2_wins = winners[
            (winners['forename_f1data'] + ' ' + winners['surname_f1data'] == driver2)
        ]
        # Head-to-head: races where both participated
        races_both = set(d1_wins['name_races']).intersection(set(d2_wins['name_races']))
        answer = (
            f"{driver1} has {len(d1_wins)} wins.\n"
            f"{driver2} has {len(d2_wins)} wins.\n"
            f"Races both have won: {', '.join(races_both) if races_both else 'None'}."
        )
        return answer

    def extract_constructor(self, query):
        # Look for known constructor names in the query
        constructor_names = winners['name_constructors'].dropna().unique()
        found = []
        for name in constructor_names:
            if isinstance(name, str) and name.lower() in query.lower():
                found.append(name)
        return list(set(found))

    def constructor_wins(self, constructor):
        # Aggregate wins for the constructor
        c_wins = winners[winners['name_constructors'] == constructor]
        facts = c_wins['fact'].tolist()
        answer = (
            f"{constructor} has {len(facts)} race wins.\n"
            + "\n".join([f"- {fact}" for fact in facts[:3]])
        )
        return answer

    def get_champion_for_year(self, year):
        # Find the driver with the most wins in the given year
        year_mask = winners['year_races'] == float(year)
        year_winners = winners[year_mask]
        if year_winners.empty:
            return f"No race winners found for {year}."
        # Count wins per driver
        driver_names = year_winners['forename_f1data'] + ' ' + year_winners['surname_f1data']
        champion = driver_names.value_counts().idxmax()
        wins = driver_names.value_counts().max()
        champion_team = year_winners[
            (year_winners['forename_f1data'] + ' ' + year_winners['surname_f1data']) == champion
        ]['name_constructors'].mode().iloc[0]
        answer = (
            f"The champion for {year} (most race wins): {champion} ({wins} wins, driving for {champion_team}).\n"
            "Race wins:\n" +
            "\n".join([
                f"- {row['fact']}"
                for _, row in year_winners[
                    (year_winners['forename_f1data'] + ' ' + year_winners['surname_f1data']) == champion
                ].iterrows()
            ])
        )
        return answer

    def extract_column_from_query(self, query):
        # Try to match a column/variable from the query
        for col, col_lower in zip(self.available_columns, self.available_columns_lower):
            if col_lower in query.lower():
                return col
        # Try to match by keywords (e.g., "year", "driver", "constructor", etc.)
        keywords = {
            "year": "year_races",
            "driver": "forename_f1data",
            "constructor": "name_constructors",
            "race": "name_races",
            "circuit": "name_races",
            "nationality": "nationality_f1data",
            "points": "points_results",
            "laps": "laps_results",
            "position": "position_results"
        }
        for key, col in keywords.items():
            if key in query.lower():
                return col
        return None

    def get_column_values(self, column, filter_query=None, limit=5):
        # Optionally filter by a keyword in the query
        df = winners
        if filter_query:
            df = df[df.apply(lambda row: filter_query.lower() in str(row).lower(), axis=1)]
        values = df[column].dropna().unique()
        return values[:limit]

    def chat(self, query, top_k=3):
        # Detect driver comparison
        drivers = self.extract_drivers(query)
        if "who would win" in query.lower() and len(drivers) == 2:
            answer = self.compare_drivers(drivers[0], drivers[1])
            self.chat_history.append({"user": query})
            self.chat_history.append({"bot": answer})
            print(f"\nUser: {query}")
            print(f"Bot:\n{answer}\n")
            return

        # Detect constructor win queries
        constructors = self.extract_constructor(query)
        if constructors and ("win" in query.lower() or "victor" in query.lower()):
            answer = self.constructor_wins(constructors[0])
            self.chat_history.append({"user": query})
            self.chat_history.append({"bot": answer})
            print(f"\nUser: {query}")
            print(f"Bot:\n{answer}\n")
            return

        # Detect champion for year queries
        import re
        match = re.search(r'champion.*(\d{4})', query.lower())
        if match:
            year = match.group(1)
            answer = self.get_champion_for_year(year)
            self.chat_history.append({"user": query})
            self.chat_history.append({"bot": answer})
            print(f"\nUser: {query}")
            print(f"Bot:\n{answer}\n")
            return

        # Variable extraction
        col = self.extract_column_from_query(query)
        if col:
            values = self.get_column_values(col, filter_query=query)
            answer = f"Sample values for '{col}': {', '.join(map(str, values))}"
            self.chat_history.append({"user": query})
            self.chat_history.append({"bot": answer})
            print(f"\nUser: {query}")
            print(f"Bot:\n{answer}\n")
            return

        self.chat_history.append({"user": query})
        results = self.retriever.retrieve(query, k=top_k)
        answer = "\n".join([f"- {fact}" for fact, score in results])
        self.chat_history.append({"bot": answer})

        print(f"\nUser: {query}")
        print(f"Bot:\n{answer}\n")

    def show_history(self):
        for turn in self.chat_history:
            for speaker, text in turn.items():
                print(f"{speaker.capitalize()}: {text}\n")

f1_chatbot = F1Chatbot(retriever)


Example Chat

In [19]:
f1_chatbot.chat("Who won the Monaco Grand Prix in 2019?")
f1_chatbot.chat("How about Silverstone in 2014?")
f1_chatbot.chat("Who was champion in 2008?")



User: Who won the Monaco Grand Prix in 2019?
Bot:
- In 2019, Lewis Hamilton won the Monaco Grand Prix driving for Mercedes.


User: How about Silverstone in 2014?
Bot:
- In 2014, Nico Rosberg won the Austrian Grand Prix driving for Mercedes.
- In 1993, Alain Prost won the German Grand Prix driving for Williams.
- In 1993, Alain Prost won the South African Grand Prix driving for Williams.


User: Who was champion in 2008?
Bot:
The champion for 2008 (most race wins): Felipe Massa (6 wins, driving for Ferrari).
Race wins:
- In 2008, Felipe Massa won the Bahrain Grand Prix driving for Ferrari.
- In 2008, Felipe Massa won the Turkish Grand Prix driving for Ferrari.
- In 2008, Felipe Massa won the French Grand Prix driving for Ferrari.
- In 2008, Felipe Massa won the European Grand Prix driving for Ferrari.
- In 2008, Felipe Massa won the Belgian Grand Prix driving for Ferrari.
- In 2008, Felipe Massa won the Brazilian Grand Prix driving for Ferrari.



In [20]:
# Save FAISS index
# faiss.write_index(retriever.index, 'f1_facts.index')

# Load FAISS index
# retriever.index = faiss.read_index('f1_facts.index')


In [21]:
%pip install ipywidgets

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.1
[notice] To update, run: C:\Users\mroja\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [23]:
import ipywidgets as widgets
from IPython.display import display, clear_output

input_box = widgets.Text(
    value='',
    placeholder='Ask a Formula 1 question...',
    description='Question:',
    disabled=False,
    layout=widgets.Layout(width='80%')
)
output_area = widgets.Output()

def on_submit(change):
    if change['type'] == 'change' and change['name'] == 'value' and change['new'] == '':
        # Only fetch results when user presses Enter (input_box.value is set to '' after Enter)
        return
    if change['type'] == 'change' and change['name'] == 'value':
        return  # Ignore all other changes
    # Only handle when user presses Enter (submit event)
    with output_area:
        clear_output(wait=True)
        f1_chatbot.chat(input_box.value)
    input_box.value = ''

input_box.on_submit(lambda _: on_submit({'type': 'submit'}))
display(input_box, output_area)

  input_box.on_submit(lambda _: on_submit({'type': 'submit'}))


Text(value='', description='Question:', layout=Layout(width='80%'), placeholder='Ask a Formula 1 question...')

Output()