# F1 Lakehouse - Data Exploration and AI Insight Notebook

Use this notebook to inspect the DuckDB warehouse that backs the lakehouse, spot data-quality issues, and generate ideas that make downstream models (dbt, Streamlit, or ML) more accurate.


## How to use this notebook

1. Mount the external SSD so `/Volumes/SAMSUNG/f1-lakehouse-data` is available and run `scripts/init_external.sh` at least once.
2. Load the `.env` file in your shell (`export $(grep -v '^#' .env | xargs)` in zsh) so `EXTERNAL_DATA_ROOT` and `F1_WAREHOUSE` are defined.
3. Execute the dependency cell below the first time you open this notebook in a new environment.
4. Optional but recommended: set `OPENAI_API_KEY` (and `OPENAI_MODEL` if you want to override the default) before running the ChatGPT helper cell.

The notebook resolves the DuckDB path automatically by checking `F1_WAREHOUSE`, the `/opt/data` mount inside containers, and the host-side SSD path. All cells assume sequential execution so run them from top to bottom.


In [None]:
%pip install -q duckdb pandas plotly openai python-dotenv "nbformat>=5"


In [None]:
import os
from pathlib import Path
from typing import Iterable, Optional

import duckdb
import numpy as np
import pandas as pd
import plotly.express as px

pd.set_option("display.max_rows", 50)
pd.set_option("display.max_columns", 120)
pd.set_option("display.width", 120)


def ns_to_seconds(series: pd.Series) -> pd.Series:
    # Convert integer nanosecond values to floating-point seconds.
    return pd.to_numeric(series, errors="coerce") / 1e9


def ns_to_pretty(series: pd.Series) -> pd.Series:
    # Render nanosecond durations as mm:ss.mmm strings.
    td = pd.to_timedelta(ns_to_seconds(series), unit="s")
    return td.apply(
        lambda x: f"{int(x.total_seconds() // 60):02d}:{int(x.total_seconds() % 60):02d}.{int(x.microseconds / 1000):03d}"
        if pd.notna(x)
        else None
    )


In [None]:
EXTERNAL_ROOT = Path(os.environ.get("EXTERNAL_DATA_ROOT", "/Volumes/SAMSUNG/f1-lakehouse-data"))
WAREHOUSE_ENV = Path(os.environ.get("F1_WAREHOUSE", "/opt/data/warehouse/f1.duckdb"))

candidate_paths = []
for path in (
    WAREHOUSE_ENV,
    Path(str(WAREHOUSE_ENV).replace("/opt/data", str(EXTERNAL_ROOT)))
    if str(WAREHOUSE_ENV).startswith("/opt/data")
    else None,
    EXTERNAL_ROOT / "warehouse" / "f1.duckdb",
    Path.cwd() / "warehouse" / "f1.duckdb",
):
    if path and path not in candidate_paths:
        candidate_paths.append(path)

WAREHOUSE_PATH = next((p for p in candidate_paths if p.exists()), None)
if WAREHOUSE_PATH is None:
    raise FileNotFoundError(f"Could not find DuckDB warehouse in any of: {candidate_paths}")

print(f"Using DuckDB warehouse at {WAREHOUSE_PATH}")
con = duckdb.connect(str(WAREHOUSE_PATH), read_only=True)


def resolve_schema(connection: duckdb.DuckDBPyConnection, base_schema: str, table_name: Optional[str] = None) -> str:
    candidates = [f"main_{base_schema}", base_schema]
    for schema in candidates:
        exists = connection.execute(
            "SELECT 1 FROM information_schema.schemata WHERE schema_name = ? LIMIT 1",
            [schema],
        ).fetchone()
        if not exists:
            continue
        if table_name:
            table_exists = connection.execute(
                "SELECT 1 FROM information_schema.tables WHERE table_schema = ? AND table_name = ? LIMIT 1",
                [schema, table_name],
            ).fetchone()
            if not table_exists:
                continue
        return schema
    raise ValueError(f"Could not find schema {base_schema} (table {table_name!r}).")


def run_df(query: str, params: Optional[dict] = None) -> pd.DataFrame:
    return con.execute(query, params or {}).df()


In [None]:
silver_schema = resolve_schema(con, "silver", "laps")
gold_schema = resolve_schema(con, "gold", "driver_session_summary")
print(f"Resolved schemas -> silver: {silver_schema}, gold: {gold_schema}")


### Table inventory
Inspect how many rows exist per modeled table so you know what is available for exploration.


In [None]:
table_df = run_df(f"""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema IN ('{silver_schema}', '{gold_schema}')
ORDER BY table_schema, table_name
""")

row_counts = []
for schema, table in table_df[["table_schema", "table_name"]].itertuples(index=False):
    row_counts.append(con.execute(f"SELECT COUNT(*) FROM {schema}.{table}").fetchone()[0])

table_df["row_count"] = row_counts
table_df


### Season coverage and KPIs
Identify which seasons and sessions are loaded plus high-level lap counts, driver coverage, and team coverage.


In [None]:
season_counts = run_df(f"""
SELECT
  season,
  session_code,
  COUNT(*)            AS lap_count,
  COUNT(DISTINCT driver) AS drivers,
  COUNT(DISTINCT team)   AS teams
FROM {silver_schema}.laps
GROUP BY 1, 2
ORDER BY season DESC, session_code
""")

if season_counts.empty:
    raise RuntimeError("No lap data found. Run ingestion + dbt before using this notebook.")

latest_season = int(season_counts["season"].max())
latest_round_df = run_df(
    f"""SELECT MAX(round) AS round FROM {silver_schema}.laps WHERE season = $season""",
    {"season": latest_season},
)
latest_round = int(latest_round_df.iloc[0]["round"])
print(f"Latest season detected: {latest_season} (round {latest_round})")
season_counts


In [None]:
fig = px.bar(
    season_counts,
    x="session_code",
    y="lap_count",
    color=season_counts["season"].astype(str),
    barmode="group",
    title="Lap counts by session"
)
fig.update_layout(xaxis_title="Session", yaxis_title="Lap count", legend_title="Season")
fig.show()


### Lap-level preview (latest season/round)
Review a thin slice of lap telemetry to understand which columns need cleaning or feature engineering.


In [None]:
laps_preview = run_df(
    f"""
    SELECT
      season, round, grand_prix, session_code,
      driver, drivernumber, team,
      lapnumber, stint,
      laptime, sector1time, sector2time, sector3time,
      pitouttime, pitintime,
      compound, tyrelife, freshtyre, trackstatus
    FROM {silver_schema}.laps
    WHERE season = $season AND round = $round
    ORDER BY session_code, driver, lapnumber
    LIMIT 25
    """,
    {"season": latest_season, "round": latest_round},
)

for col in ["laptime", "sector1time", "sector2time", "sector3time"]:
    if col in laps_preview.columns:
        laps_preview[f"{col}_sec"] = ns_to_seconds(laps_preview[col]).round(3)

laps_preview


### Missing values and potential data-cleaning targets
Measure column-level null percentages for the latest season to prioritize cleaning work (e.g., missing compounds or lap times).


In [None]:
laps_latest = run_df(
    f"""
    SELECT *
    FROM {silver_schema}.laps
    WHERE season = $season
    """,
    {"season": latest_season},
)

missing = laps_latest.isna().mean().sort_values(ascending=False) * 100
missing = missing[missing > 0]
missing_df = missing.round(2).reset_index()
missing_df.columns = ["column", "missing_pct"]
missing_df.head(20)


In [None]:
fig = px.bar(
    missing_df.head(20),
    x="column",
    y="missing_pct",
    title=f"Top 20 columns by missing percentage (season {latest_season})"
)
fig.update_layout(xaxis_title="Column", yaxis_title="Percent missing")
fig.show()


### Driver/number consistency checks
Spot drivers that map to multiple numbers or teams within the same season (possible data-quality issues).


In [None]:
driver_aliases = run_df(
    f"""
    SELECT
      driver,
      COUNT(DISTINCT drivernumber) AS distinct_numbers,
      COUNT(DISTINCT team)         AS distinct_teams
    FROM {silver_schema}.laps
    WHERE season = $season
    GROUP BY driver
    HAVING COUNT(DISTINCT drivernumber) > 1
       OR COUNT(DISTINCT team) > 1
    ORDER BY distinct_numbers DESC, distinct_teams DESC
    """,
    {"season": latest_season},
)
driver_aliases


### Lap-time distribution and outliers
Convert nanoseconds to seconds and visualize the lap-time spread per session to pinpoint outliers or sessions that need trimming.


In [None]:
pacedata = laps_latest.dropna(subset=["laptime"]).copy()
pacedata["lap_seconds"] = ns_to_seconds(pacedata["laptime"])

fig = px.histogram(
    pacedata,
    x="lap_seconds",
    color="session_code",
    nbins=60,
    opacity=0.75,
    title=f"Lap-time distribution (season {latest_season})",
)
fig.update_layout(xaxis_title="Lap time (seconds)", yaxis_title="Lap count")
fig.show()

pace_stats = pacedata.groupby("session_code")["lap_seconds"].agg(["count", "min", "median", "max"]).round(3)
pace_stats


### Tyre compound mix
Check tyre usage counts to understand whether additional balancing or filtering is required.


In [None]:
compound_mix = run_df(
    f"""
    SELECT compound, COUNT(*) AS laps
    FROM {silver_schema}.laps
    WHERE season = $season AND compound IS NOT NULL
    GROUP BY compound
    ORDER BY laps DESC
    """,
    {"season": latest_season},
)
compound_mix


In [None]:
fig = px.bar(
    compound_mix,
    x="compound",
    y="laps",
    title=f"Tyre compound usage (season {latest_season})"
)
fig.update_layout(xaxis_title="Compound", yaxis_title="Lap count")
fig.show()


### Weather context vs. pace
Aggregate weather signals per session and compare them to driver pace so you can spot environmental drivers of variance.


In [None]:
weather_agg = run_df(
    f"""
    SELECT
      season,
      round,
      session_code,
      AVG(tracktemp)  AS avg_track_temp,
      AVG(airtemp)    AS avg_air_temp,
      AVG(windspeed)  AS avg_wind_speed,
      MAX(CASE WHEN rainfall THEN 1 ELSE 0 END) AS had_rain
    FROM {silver_schema}.weather
    GROUP BY 1, 2, 3
    ORDER BY season DESC, round DESC, session_code
    """
)
weather_agg.head()


In [None]:
pace_vs_weather = run_df(
    f"""
    WITH pace AS (
      SELECT
        season,
        round,
        session_code,
        driver,
        team,
        MIN(laptime) AS best_lap_ns
      FROM {silver_schema}.laps
      WHERE season = $season
      GROUP BY 1, 2, 3, 4, 5
    )
    SELECT
      p.driver,
      p.team,
      p.session_code,
      p.best_lap_ns,
      w.avg_track_temp,
      w.avg_air_temp,
      w.had_rain
    FROM pace p
    LEFT JOIN weather_agg w
      ON p.season = w.season
     AND p.round = w.round
     AND p.session_code = w.session_code
    WHERE p.session_code = 'R'
    """,
    {"season": latest_season},
)

pace_vs_weather["best_lap_seconds"] = ns_to_seconds(pace_vs_weather["best_lap_ns"])
fig = px.scatter(
    pace_vs_weather,
    x="avg_track_temp",
    y="best_lap_seconds",
    color="team",
    hover_name="driver",
    title="Best race lap vs. average track temperature",
)
fig.update_layout(xaxis_title="Avg track temp (C)", yaxis_title="Best lap (s)")
fig.show()


### Silver results snapshot
Inspect broadcast-facing result attributes (positions, grid, points, status) to compare against lap-derived metrics.


In [None]:
results_snapshot = run_df(
    f"""
    SELECT
      season,
      round,
      session_code,
      broadcastname AS driver_name,
      teamname,
      position,
      classifiedposition,
      gridposition,
      status,
      points
    FROM {silver_schema}.results
    WHERE season = $season
    ORDER BY round DESC, session_code DESC, position
    LIMIT 40
    """,
    {"season": latest_season},
)
results_snapshot


### Gold mart: driver_session_summary
Use the curated gold table to connect lap behavior to business-ready metrics.


In [None]:
driver_summary = run_df(
    f"""
    SELECT
      season,
      round,
      grand_prix,
      session_code,
      driver,
      team,
      laps_total,
      laps_on_track,
      pitstops,
      best_lap_time,
      personal_best_laps
    FROM {gold_schema}.driver_session_summary
    WHERE season = $season
    ORDER BY round DESC, session_code, best_lap_time
    """,
    {"season": latest_season},
)

driver_summary["best_lap_seconds"] = ns_to_seconds(driver_summary["best_lap_time"])
driver_summary.head(20)


In [None]:
fig = px.scatter(
    driver_summary,
    x="laps_on_track",
    y="best_lap_seconds",
    color="team",
    hover_name="driver",
    facet_col="session_code",
    title=f"Best lap vs. track time (season {latest_season})"
)
fig.update_layout(xaxis_title="Laps on track", yaxis_title="Best lap (s)")
fig.show()


### ChatGPT-powered insights (optional)
Call OpenAI's API to summarize findings or suggest cleaning actions using any DataFrame from above. Requires `OPENAI_API_KEY` to be set.


In [None]:
try:
    from openai import OpenAI
except ImportError:
    OpenAI = None


def summarize_with_chatgpt(df: pd.DataFrame, question: str, sample_rows: int = 40, model: Optional[str] = None) -> str:
    """Send a compact CSV sample plus a question to ChatGPT and return the response."""
    if df.empty:
        raise ValueError("DataFrame is empty. Provide rows before calling the model.")
    if OpenAI is None:
        raise ImportError("openai package is missing. Re-run the %pip install cell.")

    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError("OPENAI_API_KEY is not set in the environment.")

    client = OpenAI(api_key=api_key)
    model_name = model or os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
    sample = df.head(sample_rows)
    csv_payload = sample.to_csv(index=False)

    messages = [
        {
            "role": "system",
            "content": (
                "You are a senior analytics engineer. "
                "Read the CSV rows, describe notable patterns or anomalies, and suggest concrete data cleaning or modeling actions."
            ),
        },
        {
            "role": "user",
            "content": f"Question: {question}\n\nCSV sample:\n{csv_payload}",
        },
    ]

    response = client.chat.completions.create(
        model=model_name,
        messages=messages,
        temperature=0.2,
        max_tokens=400,
    )
    return response.choices[0].message.content.strip()


In [None]:
ai_input = driver_summary.loc[driver_summary["session_code"] == "R", [
    "driver",
    "team",
    "laps_on_track",
    "pitstops",
    "best_lap_seconds",
    "personal_best_laps",
]].copy()
ai_question = f"Where should we focus cleaning or feature engineering to improve race-pace insights for season {latest_season}?"

try:
    ai_response = summarize_with_chatgpt(ai_input, question=ai_question, sample_rows=25)
    print(ai_response)
except Exception as exc:
    print(f"AI insight unavailable: {exc}")
