#  F1 Data Engineering Project - Bronze Layer
## Raw Data Ingestion with PySpark

This notebook ingests raw F1 data from CSV files into the Bronze layer using **PySpark DataFrame API**.

**Features:**
-  PySpark StructType schemas
-  DataFrame API for data loading
-  Snake_case column naming
-  Idempotent with overwrite mode
-  Metadata columns for lineage

**Data Sources:**
- circuits.csv, constructors.csv, drivers.csv, races.csv, results.csv
- qualifying.csv, lap_times.csv, pit_stops.csv
- driver_standings.csv, constructor_standings.csv
- status.csv, seasons.csv, constructor_results.csv, sprint_results.csv

## Configuration

In [0]:
# CONFIGURATION

CATALOG_NAME = "f1_dev"
BRONZE_SCHEMA = "bronze"

# Azure Storage paths
STORAGE_ACCOUNT = "databricksf1projectdl"
CONTAINER = "bronze"

# Source path (where CSV files are located)
SOURCE_PATH = f"abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net"

print(f"Source Path: {SOURCE_PATH}")
print(f"Target: {CATALOG_NAME}.{BRONZE_SCHEMA}")

Source Path: abfss://bronze@databricksf1projectdl.dfs.core.windows.net
Target: f1_dev.bronze


## Setup Unity Catalog

In [0]:
# Set catalog context
spark.sql(f"USE CATALOG {CATALOG_NAME}")

# Create Bronze schema if not exists
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG_NAME}.{BRONZE_SCHEMA}")
spark.sql(f"USE SCHEMA {BRONZE_SCHEMA}")

print(f"Using: {CATALOG_NAME}.{BRONZE_SCHEMA}")

Using: f1_dev.bronze


## Import Libraries

In [0]:
from pyspark.sql.functions import current_timestamp, lit, input_file_name
from pyspark.sql.types import (
    StructType, StructField, 
    IntegerType, LongType, StringType, 
    DoubleType, FloatType, DateType, TimestampType
)

## Define PySpark Schemas (StructType)

In [0]:
# ==================== SOURCE TABLE SCHEMAS ====================

# Circuits Schema
circuits_schema = StructType([
    StructField("circuit_id", IntegerType(), True),
    StructField("circuit_ref", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True)
])

# Constructors Schema
constructors_schema = StructType([
    StructField("constructor_id", IntegerType(), True),
    StructField("constructor_ref", StringType(), True),
    StructField("name", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True)
])

# Drivers Schema
drivers_schema = StructType([
    StructField("driver_id", IntegerType(), True),
    StructField("driver_ref", StringType(), True),
    StructField("number", StringType(), True),
    StructField("code", StringType(), True),
    StructField("forename", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True)
])

# Races Schema
races_schema = StructType([
    StructField("race_id", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("round", IntegerType(), True),
    StructField("circuit_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("date", StringType(), True),
    StructField("time", StringType(), True),
    StructField("url", StringType(), True),
    StructField("fp1_date", StringType(), True),
    StructField("fp1_time", StringType(), True),
    StructField("fp2_date", StringType(), True),
    StructField("fp2_time", StringType(), True),
    StructField("fp3_date", StringType(), True),
    StructField("fp3_time", StringType(), True),
    StructField("quali_date", StringType(), True),
    StructField("quali_time", StringType(), True),
    StructField("sprint_date", StringType(), True),
    StructField("sprint_time", StringType(), True)
])

# Seasons Schema
seasons_schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("url", StringType(), True)
])

# Status Schema
status_schema = StructType([
    StructField("status_id", IntegerType(), True),
    StructField("status", StringType(), True)
])

# Results Schema
results_schema = StructType([
    StructField("result_id", IntegerType(), True),
    StructField("race_id", IntegerType(), True),
    StructField("driver_id", IntegerType(), True),
    StructField("constructor_id", IntegerType(), True),
    StructField("number", StringType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", StringType(), True),
    StructField("position_text", StringType(), True),
    StructField("position_order", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", StringType(), True),
    StructField("fastest_lap", StringType(), True),
    StructField("rank", StringType(), True),
    StructField("fastest_lap_time", StringType(), True),
    StructField("fastest_lap_speed", StringType(), True),
    StructField("status_id", IntegerType(), True)
])

# Qualifying Schema
qualifying_schema = StructType([
    StructField("qualify_id", IntegerType(), True),
    StructField("race_id", IntegerType(), True),
    StructField("driver_id", IntegerType(), True),
    StructField("constructor_id", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("q1", StringType(), True),
    StructField("q2", StringType(), True),
    StructField("q3", StringType(), True)
])

# Lap Times Schema
lap_times_schema = StructType([
    StructField("race_id", IntegerType(), True),
    StructField("driver_id", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True)
])

# Pit Stops Schema
pit_stops_schema = StructType([
    StructField("race_id", IntegerType(), True),
    StructField("driver_id", IntegerType(), True),
    StructField("stop", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True)
])

# Driver Standings Schema
driver_standings_schema = StructType([
    StructField("driver_standings_id", IntegerType(), True),
    StructField("race_id", IntegerType(), True),
    StructField("driver_id", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("position", IntegerType(), True),
    StructField("position_text", StringType(), True),
    StructField("wins", IntegerType(), True)
])

# Constructor Standings Schema
constructor_standings_schema = StructType([
    StructField("constructor_standings_id", IntegerType(), True),
    StructField("race_id", IntegerType(), True),
    StructField("constructor_id", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("position", IntegerType(), True),
    StructField("position_text", StringType(), True),
    StructField("wins", IntegerType(), True)
])

# Constructor Results Schema
constructor_results_schema = StructType([
    StructField("constructor_results_id", IntegerType(), True),
    StructField("race_id", IntegerType(), True),
    StructField("constructor_id", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("status", StringType(), True)
])

# Sprint Results Schema
sprint_results_schema = StructType([
    StructField("result_id", IntegerType(), True),
    StructField("race_id", IntegerType(), True),
    StructField("driver_id", IntegerType(), True),
    StructField("constructor_id", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", StringType(), True),
    StructField("position_text", StringType(), True),
    StructField("position_order", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", StringType(), True),
    StructField("fastest_lap", StringType(), True),
    StructField("fastest_lap_time", StringType(), True),
    StructField("status_id", IntegerType(), True)
])

print("All schemas defined")

All schemas defined


## Define Table Configurations

In [0]:
# Source tables configuration: maps table name to CSV file and schema
TABLE_CONFIGS = {
    "circuits": {"file": "circuits.csv", "schema": circuits_schema},
    "constructors": {"file": "constructors.csv", "schema": constructors_schema},
    "drivers": {"file": "drivers.csv", "schema": drivers_schema},
    "races": {"file": "races.csv", "schema": races_schema},
    "seasons": {"file": "seasons.csv", "schema": seasons_schema},
    "status": {"file": "status.csv", "schema": status_schema},
    "results": {"file": "results.csv", "schema": results_schema},
    "qualifying": {"file": "qualifying.csv", "schema": qualifying_schema},
    "lap_times": {"file": "lap_times.csv", "schema": lap_times_schema},
    "pit_stops": {"file": "pit_stops.csv", "schema": pit_stops_schema},
    "driver_standings": {"file": "driver_standings.csv", "schema": driver_standings_schema},
    "constructor_standings": {"file": "constructor_standings.csv", "schema": constructor_standings_schema},
    "constructor_results": {"file": "constructor_results.csv", "schema": constructor_results_schema},
    "sprint_results": {"file": "sprint_results.csv", "schema": sprint_results_schema}
}

print(f"Configured {len(TABLE_CONFIGS)} tables for ingestion")

Configured 14 tables for ingestion


## Ingestion Function (PySpark)

In [0]:
def ingest_to_bronze(
    table_name: str,
    file_name: str,
    schema: StructType,
    source_path: str = SOURCE_PATH,
    catalog: str = CATALOG_NAME,
    database: str = BRONZE_SCHEMA
) -> dict:
    """
    Ingest CSV file into Unity Catalog Bronze layer using PySpark DataFrame API.
    
    Args:
        table_name: Target table name
        file_name: Source CSV file name
        schema: PySpark StructType schema
        source_path: Base path for source files
        catalog: Unity Catalog name
        database: Schema/database name
    
    Returns:
        Dictionary with ingestion statistics
    """
    file_path = f"{source_path}/{file_name}"
    full_table_name = f"{catalog}.{database}.{table_name}"
    
    # Step 1: Read CSV with PySpark
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("nullValue", "\\N") \
        .option("emptyValue", "") \
        .schema(schema) \
        .load(file_path)
    
    # Step 2: Add metadata columns for data lineage
    df = df \
        .withColumn("ingestion_timestamp", current_timestamp()) \
        .withColumn("source_file", lit(file_name))
    
    # Step 3: Write to Unity Catalog as managed Delta table
    df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(full_table_name)
    
    # Step 4: Get statistics
    count = spark.table(full_table_name).count()
    
    return {
        "table": table_name,
        "file": file_name,
        "records": count,
        "columns": len(schema.fields) + 2  # +2 for metadata columns
    }

## Ingest All Tables

In [0]:
print("=" * 70)
print("=" * 70)
print(f"Catalog: {CATALOG_NAME}")
print(f"Schema: {BRONZE_SCHEMA}")
print(f"Source: {SOURCE_PATH}")
print("=" * 70)
print()

# Track results
ingestion_results = []

# Ingest all tables
for table_name, config in TABLE_CONFIGS.items():
    try:
        result = ingest_to_bronze(
            table_name=table_name,
            file_name=config["file"],
            schema=config["schema"]
        )
        ingestion_results.append(result)
        print(f"[OK] {table_name}: {result['records']:,} records loaded")
        
    except Exception as e:
        error_msg = str(e)[:100]
        print(f"[ERROR] {table_name}: {error_msg}")
        ingestion_results.append({
            "table": table_name,
            "file": config["file"],
            "records": 0,
            "error": error_msg
        })

BRONZE LAYER INGESTION - Unity Catalog (PySpark)
Catalog: f1_dev
Schema: bronze
Source: abfss://bronze@databricksf1projectdl.dfs.core.windows.net

[OK] circuits: 77 records loaded
[OK] constructors: 212 records loaded
[OK] drivers: 864 records loaded
[OK] races: 1,149 records loaded
[OK] seasons: 76 records loaded
[OK] status: 139 records loaded
[OK] results: 27,238 records loaded
[OK] qualifying: 10,973 records loaded
[OK] lap_times: 615,738 records loaded
[OK] pit_stops: 12,192 records loaded
[OK] driver_standings: 35,361 records loaded
[OK] constructor_standings: 13,631 records loaded
[OK] constructor_results: 12,865 records loaded
[OK] sprint_results: 480 records loaded


## Sample Data Preview

In [0]:
# Preview circuits table
print("Sample Data: circuits")
display(spark.table(f"{CATALOG_NAME}.{BRONZE_SCHEMA}.circuits").limit(5))

Sample Data: circuits


circuit_id,circuit_ref,name,location,country,lat,lng,alt,url,ingestion_timestamp,source_file
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,http://en.wikipedia.org/wiki/Melbourne_Grand_Prix_Circuit,2025-12-29T17:53:18.393604Z,circuits.csv
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,http://en.wikipedia.org/wiki/Sepang_International_Circuit,2025-12-29T17:53:18.393604Z,circuits.csv
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,http://en.wikipedia.org/wiki/Bahrain_International_Circuit,2025-12-29T17:53:18.393604Z,circuits.csv
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,http://en.wikipedia.org/wiki/Circuit_de_Barcelona-Catalunya,2025-12-29T17:53:18.393604Z,circuits.csv
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,http://en.wikipedia.org/wiki/Istanbul_Park,2025-12-29T17:53:18.393604Z,circuits.csv


In [0]:
# Preview drivers table
print("Sample Data: drivers")
display(spark.table(f"{CATALOG_NAME}.{BRONZE_SCHEMA}.drivers").limit(5))

Sample Data: drivers


driver_id,driver_ref,number,code,forename,surname,dob,nationality,url,ingestion_timestamp,source_file
1,hamilton,44.0,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton,2025-12-29T17:53:25.104646Z,drivers.csv
2,heidfeld,,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld,2025-12-29T17:53:25.104646Z,drivers.csv
3,rosberg,6.0,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg,2025-12-29T17:53:25.104646Z,drivers.csv
4,alonso,14.0,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso,2025-12-29T17:53:25.104646Z,drivers.csv
5,kovalainen,,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen,2025-12-29T17:53:25.104646Z,drivers.csv


In [0]:
# Preview results table
print("Sample Data: results")
display(spark.table(f"{CATALOG_NAME}.{BRONZE_SCHEMA}.results").limit(5))

Sample Data: results


result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,status_id,ingestion_timestamp,source_file
1,18,1,1,22,1,1,1,1,10.0,58,1:34:50.616,5690616,39,2,1:27.452,218.3,1,2025-12-29T17:53:36.587268Z,results.csv
2,18,2,2,3,5,2,2,2,8.0,58,+5.478,5696094,41,3,1:27.739,217.586,1,2025-12-29T17:53:36.587268Z,results.csv
3,18,3,3,7,7,3,3,3,6.0,58,+8.163,5698779,41,5,1:28.090,216.719,1,2025-12-29T17:53:36.587268Z,results.csv
4,18,4,4,5,11,4,4,4,5.0,58,+17.181,5707797,58,7,1:28.603,215.464,1,2025-12-29T17:53:36.587268Z,results.csv
5,18,5,1,23,3,5,5,5,4.0,58,+18.014,5708630,43,1,1:27.418,218.385,1,2025-12-29T17:53:36.587268Z,results.csv


## Schema Information

In [0]:
# Show schema for circuits table
print("Schema: circuits")
spark.table(f"{CATALOG_NAME}.{BRONZE_SCHEMA}.circuits").printSchema()

Schema: circuits
root
 |-- circuit_id: integer (nullable = true)
 |-- circuit_ref: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_file: string (nullable = true)

