# F1 Analytics Delta Live Tables (DLT) Pipeline

This notebook implements the Delta Live Tables (DLT) pipeline for the F1 Analytics project.

## Pipeline Overview
- **Bronze Tables:** Raw landing data from FastF1 API
- **Silver Tables:** Cleaned and transformed data
- **Gold Tables:** Aggregated insights for analytics

## Data Quality & Expectations
- `DriverId` and `LapTime` are not null
- Telemetry values within realistic bounds
- Incremental updates supported

## Target Catalog
- Catalog: `f1_catalog`
- Schemas: `bronze`, `silver`, `gold`

In [0]:
import dlt
from pyspark.sql import functions as F

# ----------------------
# Bronze Tables
# ----------------------

@dlt.table(
    name="bronze_lap_times",
    comment="Raw lap times from FastF1 API",
    table_properties={"quality": "bronze"}
)
def lap_times_bronze():
    df = spark.read.format("delta").load("/mnt/f1analytics/bronze/lap_times")
    return df

@dlt.table(
    name="bronze_telemetry",
    comment="Raw telemetry data",
    table_properties={"quality": "bronze"}
)
def telemetry_bronze():
    df = spark.read.format("delta").load("/mnt/f1analytics/bronze/telemetry_data")
    return df

@dlt.table(
    name="bronze_session_results",
    comment="Raw session results",
    table_properties={"quality": "bronze"}
)
def session_results_bronze():
    df = spark.read.format("delta").load("/mnt/f1analytics/bronze/session_results")
    return df

@dlt.table(
    name="bronze_race_control_messages",
    comment="Raw race control messages",
    table_properties={"quality": "bronze"}
)
def race_control_bronze():
    df = spark.read.format("delta").load("/mnt/f1analytics/bronze/race_control_messages")
    return df

@dlt.table(
    name="bronze_circuit_info",
    comment="Raw circuit info",
    table_properties={"quality": "bronze"}
)
def circuit_info_bronze():
    df = spark.read.format("delta").load("/mnt/f1analytics/bronze/circuit_info")
    return df

@dlt.table(
    name="bronze_session_info",
    comment="Raw session info",
    table_properties={"quality": "bronze"}
)
def session_info_bronze():
    df = spark.read.format("delta").load("/mnt/f1analytics/bronze/session_info")
    return df

@dlt.table(
    name="bronze_weather_data",
    comment="Raw weather data",
    table_properties={"quality": "bronze"}
)
def weather_bronze():
    df = spark.read.format("delta").load("/mnt/f1analytics/bronze/weather_data")
    return df

In [0]:
# ----------------------
# Silver Tables
# ----------------------

@dlt.table(
    name="silver_lap_times",
    comment="Cleaned and transformed lap times",
    table_properties={"quality": "silver"}
)
def lap_times_silver():
    df = dlt.read("bronze_lap_times")
    df = df.withColumn("LapTime_ms", F.col("LapTime").cast("double") * 1000)
    return df

@dlt.table(
    name="silver_telemetry",
    comment="Cleaned telemetry data",
    table_properties={"quality": "silver"}
)
def telemetry_silver():
    df = dlt.read("bronze_telemetry")
    df = df.dropna(subset=["Speed", "RPM"])
    return df

@dlt.table(
    name="silver_session_results",
    comment="Cleaned session results",
    table_properties={"quality": "silver"}
)
def session_results_silver():
    df = dlt.read("bronze_session_results")
    df = df.dropna(subset=["DriverId", "Position"])
    return df

@dlt.table(
    name="silver_race_control_messages",
    comment="Cleaned race control messages",
    table_properties={"quality": "silver"}
)
def race_control_silver():
    df = dlt.read("bronze_race_control_messages")
    return df

@dlt.table(
    name="silver_circuit_info",
    comment="Cleaned circuit info",
    table_properties={"quality": "silver"}
)
def circuit_info_silver():
    df = dlt.read("bronze_circuit_info")
    return df

@dlt.table(
    name="silver_session_info",
    comment="Cleaned session info",
    table_properties={"quality": "silver"}
)
def session_info_silver():
    df = dlt.read("bronze_session_info")
    return df

@dlt.table(
    name="silver_weather",
    comment="Cleaned weather data",
    table_properties={"quality": "silver"}
)
def weather_silver():
    df = dlt.read("bronze_weather_data")
    return df

In [0]:
# ----------------------
# Gold Tables
# ----------------------

@dlt.table(
    name="gold_race_results",
    comment="Aggregated race results for analytics",
    table_properties={"quality": "gold"}
)
def race_results_gold():
    lap_df = dlt.read("silver_lap_times")
    result_df = dlt.read("silver_session_results")
    df = lap_df.join(result_df, ["DriverId", "year"], "inner") \
               .groupBy("DriverId", "TeamName", "year") \
               .agg(F.sum("Points").alias("TotalPoints"),
                    F.avg("LapTime_ms").alias("AvgLapTime_ms"),
                    F.max("LapNumber").alias("TotalLaps"))
    return df