# üóÑÔ∏è Notebook 03 ‚Äî PostgreSQL Loading & Relational Validation

## üéØ Objective of This Notebook

Notebook 03 marks a **major architectural transition** in the project.

Up to Notebook 02, all processing was performed in a **file-based analytical context** using standardized Parquet files. While that stage guaranteed **column-level correctness**, **unit consistency**, and **schema stability**, it deliberately avoided relational enforcement.

This notebook exists to answer a different question:

> ‚ÄúCan our standardized race data survive strict relational rules, repeated execution, and real analytical joins ‚Äî without breaking?‚Äù

Notebook 03 is where the data pipeline becomes **structurally real**.

---

## üß© Context from Notebook 02

From Notebook 02, we now have:

‚úÖ Fully standardized Parquet data  
‚úÖ One directory per race  
‚úÖ One canonical schema for each dataset  
‚úÖ No ambiguity in column names or units  

Directory structure:
```
data/interim/standardized/
‚îî‚îÄ‚îÄ year=YYYY/
    ‚îî‚îÄ‚îÄ round=XX_<race_name>/
        ‚îú‚îÄ‚îÄ laps.parquet
        ‚îú‚îÄ‚îÄ results.parquet
        ‚îî‚îÄ‚îÄ track_status.parquet
```
Important guarantees from Notebook 02:

- üü¢ Lap times are in **milliseconds**
- üü¢ Lap grain is **one row per driver per lap**
- üü¢ Driver identifiers are stable *within a race*
- üü¢ No downstream notebook is allowed to ‚Äúfix‚Äù schema issues

Notebook 03 **must adapt to this data**, not reshape it.

---

## üêò Why PostgreSQL Is Introduced Here

PostgreSQL is introduced **only after standardization**, not before, because:

- üì¶ Parquet cannot enforce:
  - Primary keys
  - Foreign keys
  - Idempotent writes
- üîó Strategy analysis requires **trustworthy joins**
- üîÅ Re-runnable pipelines require **conflict handling**

PostgreSQL in this project is:

‚úÖ An analytical backbone  
‚ùå Not a transformation engine  
‚ùå Not a cleaning layer  

---

## üß± What This Notebook Will Do

This notebook will:

### 1Ô∏è‚É£ Establish a PostgreSQL connection
- Using environment-based configuration
- Fully portable across machines

### 2Ô∏è‚É£ Define strict relational schema
Including:

- **races**
  - One row per race
  - Identified by `race_id`
- **drivers**
  - One row per `(race_id, driver_code)`
- **laps**
  - One row per `(race_id, driver_code, lap_number)`

### 3Ô∏è‚É£ Enforce explicit data grain
- No implicit joins
- No inferred keys
- No tolerance for duplicates

### 4Ô∏è‚É£ Load standardized Parquet ‚Üí PostgreSQL
- One race at a time
- With defensive column selection
- Ignoring irrelevant columns safely

### 5Ô∏è‚É£ Guarantee idempotency üîÅ
- Safe kernel restarts
- Safe re-execution
- No duplicate inserts
- No manual truncation required

### 6Ô∏è‚É£ Validate relational integrity üß™
- Row counts
- Orphan detection
- Lap grain uniqueness

---

## üö´ What This Notebook Will NOT Do

This notebook explicitly does **not**:

‚ùå Compute cumulative lap times  
‚ùå Identify pit laps or out laps  
‚ùå Detect undercuts  
‚ùå Perform any race strategy logic  
‚ùå Load track status into PostgreSQL  

Those steps require **derived temporal context** and belong downstream.

---

## üö¶ Track Status: Intentional Deferral

Although `track_status.parquet` exists for every race:

- It is **event-based**, not lap-based
- It cannot be joined meaningfully without lap timelines
- Loading it prematurely caused grain violations during development

‚û°Ô∏è Therefore:
- Track status files are **validated for presence**
- Actual usage is deferred to a later notebook

This is a **design decision**, not a limitation.

---

## ‚úÖ Expected Outcome of Notebook 03

By the end of this notebook, we expect:

‚úî PostgreSQL populated with all races (2022‚Äì2024)  
‚úî Strict relational invariants enforced  
‚úî Fully idempotent execution  
‚úî Database safe for analytical feature computation  

Notebook 03 does **not produce insights** ‚Äî it produces **trust**.

Everything that follows depends on this foundation.



In [1]:
# ============================================================
# Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
# Cell 1: Environment setup and execution preconditions
# ============================================================

# ------------------------------------------------------------
# Resolve project root dynamically (Jupyter-safe)
# ------------------------------------------------------------
import sys
from pathlib import Path

def find_project_root(start_path: Path) -> Path:
    current = start_path.resolve()
    for parent in [current] + list(current.parents):
        if (parent / "src").exists():
            return parent
    raise RuntimeError(
        "Could not locate project root. Ensure notebook is inside the project."
    )

PROJECT_ROOT = find_project_root(Path.cwd())
sys.path.insert(0, str(PROJECT_ROOT))

# ------------------------------------------------------------
# Standard library and third-party imports
# ------------------------------------------------------------
import os
import json
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine, inspect

# ------------------------------------------------------------
# Project imports
# ------------------------------------------------------------
from src.config import Config
from src.logging_config import setup_logging
from src.utils import ensure_dir

# ------------------------------------------------------------
# Initialize logging
# ------------------------------------------------------------
logger, error_logger = setup_logging()
logger.info("Starting Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants")

# ------------------------------------------------------------
# Load environment variables (.env)
# ------------------------------------------------------------
load_dotenv(PROJECT_ROOT / ".env")

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")

missing_env = [
    var for var in [
        "DB_USER", "DB_PASSWORD", "DB_HOST", "DB_PORT", "DB_NAME"
    ] if os.getenv(var) is None
]

if missing_env:
    raise EnvironmentError(
        f"Missing required database environment variables: {missing_env}"
    )

logger.info("Database environment variables loaded successfully")

# ------------------------------------------------------------
# Create PostgreSQL engine (no connection yet)
# ------------------------------------------------------------
DATABASE_URL = (
    f"postgresql://{DB_USER}:{DB_PASSWORD}"
    f"@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

engine = create_engine(DATABASE_URL)

logger.info("PostgreSQL engine constructed")

# ------------------------------------------------------------
# Resolve standardized Parquet input directory
# ------------------------------------------------------------
STANDARDIZED_DATA_DIR = Config.DATA_DIR / "interim" / "standardized"

if not STANDARDIZED_DATA_DIR.exists():
    raise FileNotFoundError(
        "Standardized Parquet directory does not exist. "
        "Notebook 02 must be executed successfully before Notebook 03."
    )

standardized_files = list(STANDARDIZED_DATA_DIR.rglob("*.parquet"))

if not standardized_files:
    raise FileNotFoundError(
        "No standardized Parquet files found. "
        "Notebook 02 did not produce usable outputs."
    )

logger.info(
    f"Discovered {len(standardized_files)} standardized Parquet files"
)

# ------------------------------------------------------------
# Inspect database state (read-only)
# ------------------------------------------------------------
inspector = inspect(engine)
existing_tables = inspector.get_table_names()

logger.info(f"Existing tables in database: {existing_tables}")

# ------------------------------------------------------------
# Declare execution scope
# ------------------------------------------------------------
print("""
============================================================
Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
------------------------------------------------------------
This notebook will:
  - Load standardized Parquet into PostgreSQL
  - Define relational schemas and constraints
  - Create derived invariants (cumulative time, pit markers)
  - Prepare data for deterministic undercut detection

This notebook will not:
  - Call external APIs
  - Perform semantic standardization
  - Detect undercut events
  - Produce analytical conclusions

Notebook 02 outputs are treated as authoritative.
============================================================
""")


2025-12-17 15:27:11,925 | INFO | src.logging_config | Starting Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
2025-12-17 15:27:11,930 | INFO | src.logging_config | Database environment variables loaded successfully
2025-12-17 15:27:12,089 | INFO | src.logging_config | PostgreSQL engine constructed
2025-12-17 15:27:12,132 | INFO | src.logging_config | Discovered 204 standardized Parquet files
2025-12-17 15:27:12,293 | INFO | src.logging_config | Existing tables in database: ['track_status', 'laps', 'drivers', 'races']



Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
------------------------------------------------------------
This notebook will:
  - Load standardized Parquet into PostgreSQL
  - Define relational schemas and constraints
  - Create derived invariants (cumulative time, pit markers)
  - Prepare data for deterministic undercut detection

This notebook will not:
  - Call external APIs
  - Perform semantic standardization
  - Detect undercut events
  - Produce analytical conclusions

Notebook 02 outputs are treated as authoritative.



In [2]:
# ============================================================
# Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
# Cell 2: Relational schema definition (corrected)
# ============================================================

from sqlalchemy import (
    Table, Column, Integer, BigInteger, String,
    Boolean, MetaData, ForeignKey, ForeignKeyConstraint,
    Index, PrimaryKeyConstraint, CheckConstraint
)

logger.info("Defining relational schema with explicit identity and grain")

metadata = MetaData()

# ------------------------------------------------------------
# RACES
# One row per race
# race_id = "{season}_{round}"
# ------------------------------------------------------------
races = Table(
    "races",
    metadata,
    Column("race_id", String, primary_key=True),
    Column("season", Integer, nullable=False),
    Column("round", Integer, nullable=False),

    CheckConstraint("season >= 1950", name="ck_races_valid_season"),
    CheckConstraint("round > 0", name="ck_races_valid_round"),
)

# ------------------------------------------------------------
# DRIVERS
# One row per driver per race (race-scoped identity)
# driver_id = "{race_id}_{driver_code}"
# ------------------------------------------------------------
drivers = Table(
    "drivers",
    metadata,
    Column("race_id", String, ForeignKey("races.race_id"), nullable=False),
    Column("driver_code", String, nullable=False),
    Column("driver_number", Integer, nullable=False),

    PrimaryKeyConstraint(
        "race_id",
        "driver_code",
        name="pk_drivers"
    ),

    CheckConstraint(
        "driver_number > 0",
        name="ck_drivers_valid_number"
    ),
)

# ------------------------------------------------------------
# LAPS
# Core fact table
# One row per (race, driver, lap)
# ------------------------------------------------------------
laps = Table(
    "laps",
    metadata,
    Column("race_id", String, ForeignKey("races.race_id"), nullable=False),
    Column("driver_code", String, nullable=False),
    Column("lap_number", Integer, nullable=False),

    # Standardized fields
    Column("lap_time_ms", BigInteger, nullable=True),
    Column("tyre_compound", String, nullable=True),

    # Relational invariants (computed later)
    Column("cumulative_time_ms", BigInteger, nullable=True),
    Column("is_pit_lap", Boolean, nullable=True),
    Column("is_out_lap", Boolean, nullable=True),

    PrimaryKeyConstraint(
        "race_id",
        "driver_code",
        "lap_number",
        name="pk_laps"
    ),

    ForeignKeyConstraint(
        ["race_id", "driver_code"],
        ["drivers.race_id", "drivers.driver_code"],
        name="fk_laps_drivers"
    ),

    CheckConstraint("lap_number > 0", name="ck_laps_valid_lap"),
)

Index("idx_laps_race_driver", laps.c.race_id, laps.c.driver_code)
Index("idx_laps_race_lap", laps.c.race_id, laps.c.lap_number)

# ------------------------------------------------------------
# TRACK STATUS
# One row per lap per race
# ------------------------------------------------------------
track_status = Table(
    "track_status",
    metadata,
    Column("race_id", String, ForeignKey("races.race_id"), nullable=False),
    Column("lap_number", Integer, nullable=False),
    Column("track_status", String, nullable=False),

    PrimaryKeyConstraint(
        "race_id",
        "lap_number",
        name="pk_track_status"
    ),

    CheckConstraint("lap_number > 0", name="ck_track_status_valid_lap"),
)

Index("idx_track_status_race_lap", track_status.c.race_id, track_status.c.lap_number)

logger.info("Relational schema defined with explicit keys and constraints")


2025-12-17 15:27:12,331 | INFO | src.logging_config | Defining relational schema with explicit identity and grain
2025-12-17 15:27:12,360 | INFO | src.logging_config | Relational schema defined with explicit keys and constraints


In [3]:
# ============================================================
# Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
# Cell 3: Execute DDL (idempotent & safe)
# ============================================================

from sqlalchemy import inspect

logger.info("Executing DDL to ensure PostgreSQL schema exists")

inspector = inspect(engine)
existing_tables = set(inspector.get_table_names())

expected_tables = {"races", "drivers", "laps", "track_status"}

# ------------------------------------------------------------
# Case 1: Database is empty ‚Üí create schema
# ------------------------------------------------------------
if not existing_tables:
    logger.info("Database is empty ‚Äî creating schema")
    metadata.create_all(engine)
    logger.info("Schema created successfully")

# ------------------------------------------------------------
# Case 2: Expected tables already exist ‚Üí skip creation
# ------------------------------------------------------------
elif existing_tables == expected_tables:
    logger.info(
        "Schema already exists with expected tables ‚Äî skipping DDL execution"
    )

# ------------------------------------------------------------
# Case 3: Unexpected tables present ‚Üí fail loudly
# ------------------------------------------------------------
else:
    raise RuntimeError(
        f"Database contains unexpected tables: {sorted(existing_tables)}. "
        f"Expected exactly: {sorted(expected_tables)}. "
        "Manual intervention required."
    )

# ------------------------------------------------------------
# Final verification
# ------------------------------------------------------------
final_tables = set(inspector.get_table_names())
missing = expected_tables - final_tables

if missing:
    raise RuntimeError(
        f"Schema verification failed. Missing tables: {missing}"
    )

logger.info(f"Verified schema tables: {sorted(final_tables)}")

print("""
============================================================
Notebook 03 ‚Äî Schema Ready
------------------------------------------------------------
‚Ä¢ Required tables exist
‚Ä¢ Primary and foreign keys enforced
‚Ä¢ Safe to proceed to data loading

This cell is idempotent and restart-safe.
============================================================
""")


2025-12-17 15:27:12,384 | INFO | src.logging_config | Executing DDL to ensure PostgreSQL schema exists
2025-12-17 15:27:12,390 | INFO | src.logging_config | Schema already exists with expected tables ‚Äî skipping DDL execution
2025-12-17 15:27:12,393 | INFO | src.logging_config | Verified schema tables: ['drivers', 'laps', 'races', 'track_status']



Notebook 03 ‚Äî Schema Ready
------------------------------------------------------------
‚Ä¢ Required tables exist
‚Ä¢ Primary and foreign keys enforced
‚Ä¢ Safe to proceed to data loading

This cell is idempotent and restart-safe.



In [4]:
# ============================================================
# Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
# Cell 4: Standardized Parquet Schema Diagnostics
# ============================================================

import json
from collections import defaultdict
import pandas as pd

from src.config import BASE_DIR, Config
from src.logging_config import setup_logging

logger, _ = setup_logging()
logger.info("Starting standardized Parquet schema diagnostics")

# ------------------------------------------------------------
# Resolve paths using actual config contract
# ------------------------------------------------------------
PROJECT_ROOT = BASE_DIR
INTERIM_DATA_DIR = Config.DATA_DIR / "interim"
STANDARDIZED_SCHEMA_DIR = INTERIM_DATA_DIR / "standardized"
SCHEMA_OUTPUT_PATH = INTERIM_DATA_DIR / "standardized_schema_snapshot.json"

# ------------------------------------------------------------
# Validate directory exists
# ------------------------------------------------------------
if not STANDARDIZED_SCHEMA_DIR.exists():
    raise FileNotFoundError(
        f"Standardized data directory not found: {STANDARDIZED_SCHEMA_DIR}"
    )

schema_snapshot = defaultdict(dict)
file_count = 0

# ------------------------------------------------------------
# Inspect all standardized parquet files
# ------------------------------------------------------------
for parquet_path in STANDARDIZED_SCHEMA_DIR.rglob("*.parquet"):
    race_dir = parquet_path.parent
    table_name = parquet_path.stem

    season_folder = race_dir.parts[-2]   # e.g. year=2024
    race_folder = race_dir.name           # e.g. round=10_Spanish_Grand_Prix
    race_key = f"{season_folder}/{race_folder}"

    df = pd.read_parquet(parquet_path)

    schema_snapshot.setdefault(table_name, {})
    schema_snapshot[table_name][race_key] = {
        "columns": list(df.columns),
        "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
        "row_count": int(len(df)),
    }

    file_count += 1

logger.info(f"Inspected {file_count} standardized Parquet files")

# ------------------------------------------------------------
# Persist schema snapshot
# ------------------------------------------------------------
with open(SCHEMA_OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(schema_snapshot, f, indent=2)

logger.info(f"Schema snapshot written to {SCHEMA_OUTPUT_PATH}")

print("""
============================================================
Notebook 03 ‚Äî Standardized Parquet Diagnostics Complete
------------------------------------------------------------
‚Ä¢ All standardized Parquet files inspected
‚Ä¢ Exact schemas captured per table per race
‚Ä¢ No assumptions made about race identity
‚Ä¢ Schema snapshot persisted for loader design

Next step:
  - Inspect standardized_schema_snapshot.json
  - Decide authoritative race identity source
  - Design robust loading logic (Cell 5)
============================================================
""")


2025-12-17 15:27:12,420 | INFO | src.logging_config | Starting standardized Parquet schema diagnostics
2025-12-17 15:27:14,979 | INFO | src.logging_config | Inspected 204 standardized Parquet files
2025-12-17 15:27:15,029 | INFO | src.logging_config | Schema snapshot written to C:\Users\hersh\Desktop\f1_analysis_project\data\interim\standardized_schema_snapshot.json



Notebook 03 ‚Äî Standardized Parquet Diagnostics Complete
------------------------------------------------------------
‚Ä¢ All standardized Parquet files inspected
‚Ä¢ Exact schemas captured per table per race
‚Ä¢ No assumptions made about race identity
‚Ä¢ Schema snapshot persisted for loader design

Next step:
  - Inspect standardized_schema_snapshot.json
  - Decide authoritative race identity source
  - Design robust loading logic (Cell 5)



In [5]:
# ============================================================
# Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
# Cell 5: Idempotent Standardized Parquet ‚Üí PostgreSQL Load
# ============================================================

import pandas as pd
from sqlalchemy import text
from src.logging_config import setup_logging
from src.config import Config

logger, _ = setup_logging()
logger.info("Starting standardized Parquet -> PostgreSQL load (idempotent)")

STANDARDIZED_DIR = Config.DATA_DIR / "interim" / "standardized"

with engine.begin() as conn:

    for year_dir in sorted(STANDARDIZED_DIR.glob("year=*")):
        season = int(year_dir.name.split("=")[1])

        for race_dir in sorted(year_dir.iterdir()):
            race_id = race_dir.name.split("_")[0].replace("round=", "")
            race_id = f"{season}_{race_id}"

            logger.info(f"Processing race {race_id}")

            # ------------------------------------------------
            # Skip race if already loaded
            # ------------------------------------------------
            race_exists = conn.execute(
                text("SELECT 1 FROM races WHERE race_id = :race_id"),
                {"race_id": race_id}
            ).first()

            if race_exists:
                logger.info(f"Race {race_id} already loaded ‚Äî skipping")
                continue

            # ------------------------------------------------
            # Load race metadata (authoritative from directory)
            # ------------------------------------------------
            round_number = int(race_dir.name.split("_")[0].split("=")[1])

            race_df = pd.DataFrame([{
                "race_id": race_id,
                "season": season,
                "round": round_number,
            }])

            race_df.to_sql(
                "races",
                conn,
                if_exists="append",
                index=False,
                method="multi",
            )

            # ------------------------------------------------
            # Drivers (from results.parquet)
            # ------------------------------------------------
            results_df = pd.read_parquet(race_dir / "results.parquet")

            drivers_df = (
                results_df[["Abbreviation", "DriverNumber"]]
                .drop_duplicates()
                .rename(columns={
                    "Abbreviation": "driver_code",
                    "DriverNumber": "driver_number",
                })
            )

            drivers_df["race_id"] = race_id

            drivers_df.to_sql(
                "drivers",
                conn,
                if_exists="append",
                index=False,
                method="multi",
            )

            # ------------------------------------------------
            # Laps
            # ------------------------------------------------
            laps_df = pd.read_parquet(race_dir / "laps.parquet")

            laps_df = laps_df.rename(columns={
                "Driver": "driver_code",
                "LapNumber": "lap_number",
            })

            laps_df["race_id"] = race_id

            laps_df = laps_df[
                [
                    "race_id",
                    "driver_code",
                    "lap_number",
                    "lap_time_ms",
                    "tyre_compound",
                ]
            ]

            logger.info(f"Laps columns: {list(laps_df.columns)}")

            laps_df.to_sql(
                "laps",
                conn,
                if_exists="append",
                index=False,
                method="multi",
            )

logger.info("All eligible standardized Parquet files loaded successfully")

print("""
============================================================
Notebook 03 ‚Äî Data Load Complete (Idempotent)
------------------------------------------------------------
‚Ä¢ Existing races safely skipped
‚Ä¢ No duplicate inserts possible
‚Ä¢ Relational invariants preserved
‚Ä¢ Track status intentionally deferred

Notebook 03 is now environment-agnostic.
============================================================
""")


2025-12-17 15:27:15,063 | INFO | src.logging_config | Starting standardized Parquet -> PostgreSQL load (idempotent)
2025-12-17 15:27:15,070 | INFO | src.logging_config | Processing race 2022_10
2025-12-17 15:27:15,077 | INFO | src.logging_config | Race 2022_10 already loaded ‚Äî skipping
2025-12-17 15:27:15,080 | INFO | src.logging_config | Processing race 2022_11
2025-12-17 15:27:15,084 | INFO | src.logging_config | Race 2022_11 already loaded ‚Äî skipping
2025-12-17 15:27:15,085 | INFO | src.logging_config | Processing race 2022_12
2025-12-17 15:27:15,089 | INFO | src.logging_config | Race 2022_12 already loaded ‚Äî skipping
2025-12-17 15:27:15,090 | INFO | src.logging_config | Processing race 2022_13
2025-12-17 15:27:15,094 | INFO | src.logging_config | Race 2022_13 already loaded ‚Äî skipping
2025-12-17 15:27:15,097 | INFO | src.logging_config | Processing race 2022_14
2025-12-17 15:27:15,100 | INFO | src.logging_config | Race 2022_14 already loaded ‚Äî skipping
2025-12-17 15:27:15


Notebook 03 ‚Äî Data Load Complete (Idempotent)
------------------------------------------------------------
‚Ä¢ Existing races safely skipped
‚Ä¢ No duplicate inserts possible
‚Ä¢ Relational invariants preserved
‚Ä¢ Track status intentionally deferred

Notebook 03 is now environment-agnostic.



In [6]:
# ============================================================
# Notebook 03 ‚Äî PostgreSQL Loading & Relational Invariants
# Cell 6: Load Validation & Pipeline Readiness
# ============================================================

from sqlalchemy import text
from src.logging_config import setup_logging
from src.config import Config

logger, _ = setup_logging()
logger.info(...)

# ------------------------------------------------------------
# Database-level validation
# ------------------------------------------------------------

with engine.connect() as conn:

    # --------------------------------------------------------
    # 1. Basic row counts (sanity, not analytics)
    # --------------------------------------------------------
    counts = conn.execute(text("""
        SELECT
            (SELECT COUNT(*) FROM races)   AS races,
            (SELECT COUNT(*) FROM drivers) AS drivers,
            (SELECT COUNT(*) FROM laps)    AS laps
    """)).mappings().one()

    logger.info(
        f"Row counts ‚Äî races={counts['races']}, "
        f"drivers={counts['drivers']}, "
        f"laps={counts['laps']}"
    )

    if counts["races"] == 0:
        raise RuntimeError("No races loaded ‚Äî pipeline is broken")

    if counts["drivers"] == 0:
        raise RuntimeError("No drivers loaded ‚Äî identity mapping failed")

    if counts["laps"] == 0:
        raise RuntimeError("No laps loaded ‚Äî core fact table empty")

    # --------------------------------------------------------
    # 2. Referential integrity: laps ‚Üí drivers
    # --------------------------------------------------------
    orphan_laps = conn.execute(text("""
        SELECT COUNT(*) AS orphan_count
        FROM laps l
        LEFT JOIN drivers d
          ON l.race_id = d.race_id
         AND l.driver_code = d.driver_code
        WHERE d.driver_code IS NULL
    """)).scalar()

    if orphan_laps > 0:
        raise RuntimeError(
            f"Found {orphan_laps} laps without matching drivers"
        )

    logger.info("No orphan laps detected")

    # --------------------------------------------------------
    # 3. Lap grain validation
    # --------------------------------------------------------
    duplicate_laps = conn.execute(text("""
        SELECT COUNT(*) FROM (
            SELECT race_id, driver_code, lap_number, COUNT(*) c
            FROM laps
            GROUP BY race_id, driver_code, lap_number
            HAVING COUNT(*) > 1
        ) t
    """)).scalar()

    if duplicate_laps > 0:
        raise RuntimeError(
            f"Duplicate lap rows detected: {duplicate_laps}"
        )

    logger.info("Lap grain verified (race_id, driver_code, lap_number)")

# ------------------------------------------------------------
# Filesystem-level validation for deferred datasets
# ------------------------------------------------------------

STANDARDIZED_DIR = Config.DATA_DIR / "interim" / "standardized"

missing_track_status = []

for year_dir in STANDARDIZED_DIR.glob("year=*"):
    for race_dir in year_dir.iterdir():
        if not (race_dir / "track_status.parquet").exists():
            missing_track_status.append(str(race_dir))

if missing_track_status:
    raise RuntimeError(
        "Standardized track_status.parquet missing for races:\n"
        + "\n".join(missing_track_status[:5])
        + ("\n..." if len(missing_track_status) > 5 else "")
    )

logger.info("Standardized track_status.parquet present for all races")

# ------------------------------------------------------------
# Final readiness confirmation
# ------------------------------------------------------------

print("""
============================================================
Notebook 03 ‚Äî Relational Validation Complete
------------------------------------------------------------
‚Ä¢ Core relational tables populated
‚Ä¢ Referential integrity enforced
‚Ä¢ Lap grain is correct and unique
‚Ä¢ Track status events available on disk
‚Ä¢ Database ready for derived feature computation

Next Notebook:
  - Compute cumulative lap times
  - Derive pit laps and out laps
  - Map track status events to lap-level flags
  - Enable deterministic undercut detection
============================================================
""")


2025-12-17 15:27:15,434 | INFO | src.logging_config | Ellipsis
2025-12-17 15:27:15,453 | INFO | src.logging_config | Row counts ‚Äî races=68, drivers=1359, laps=74605
2025-12-17 15:27:15,484 | INFO | src.logging_config | No orphan laps detected
2025-12-17 15:27:15,607 | INFO | src.logging_config | Lap grain verified (race_id, driver_code, lap_number)
2025-12-17 15:27:15,622 | INFO | src.logging_config | Standardized track_status.parquet present for all races



Notebook 03 ‚Äî Relational Validation Complete
------------------------------------------------------------
‚Ä¢ Core relational tables populated
‚Ä¢ Referential integrity enforced
‚Ä¢ Lap grain is correct and unique
‚Ä¢ Track status events available on disk
‚Ä¢ Database ready for derived feature computation

Next Notebook:
  - Compute cumulative lap times
  - Derive pit laps and out laps
  - Map track status events to lap-level flags
  - Enable deterministic undercut detection



# üß† Notebook 03 ‚Äî Conclusion, Error Log & Forward Plan

## üßæ High-Level Summary

Notebook 03 was **not a smooth execution notebook** ‚Äî and that is precisely why it is one of the most important ones.

What appeared initially as a simple ‚ÄúParquet ‚Üí PostgreSQL load‚Äù revealed:

- Hidden schema assumptions
- Grain mismatches
- Logging misconfigurations
- Idempotency pitfalls
- Incorrect mental models about track status data

Each of these was encountered, diagnosed, and resolved deliberately.

---

## üîç What We Built (Final State)

At completion, we achieved:

üìä **Relational Tables**
- `races`
- `drivers`
- `laps`

üîí **Guaranteed Invariants**
- No duplicate races
- No duplicate drivers per race
- No duplicate laps
- No orphan records

üîÅ **Idempotent Execution**
- Kernel restarts are safe
- Full notebook re-runs are safe
- Existing data is detected and skipped

---

## ‚ö†Ô∏è Errors & Problems Encountered (Chronological)

### ‚ùå Column Name Assumptions
- Expected `driver_code`, found `Driver` / `Abbreviation`
- Resolved by inspecting actual Parquet schemas

### ‚ùå NOT NULL Violations
- Missing `round` field in `races`
- Fixed by parsing race identity correctly

### ‚ùå Duplicate Primary Keys
- Occurred when rerunning inserts
- Fixed via `ON CONFLICT DO NOTHING`

### ‚ùå Track Status Grain Errors
- Attempted early loading caused null lap numbers
- Realized track status is **not lap-level**

### ‚ùå Logger Misuse
- `setup_logging()` returned `(logger, handler)`
- Incorrect unpacking caused runtime failures
- Fixed by explicit tuple handling

### ‚ùå Scope Errors in Logging
- Logging variables after conditional skips
- Fixed by removing unsafe debug lines

Each error refined the pipeline ‚Äî none were ignored.

---

## üß™ Validation Results (Cell 6)

Final checks confirmed:

üìà Row counts:
- 68 races
- 1,359 drivers
- 74,605 laps

üß¨ Integrity:
- No orphan laps
- Correct lap grain `(race_id, driver_code, lap_number)`

üìÇ Track status:
- Present for all races **on disk**
- Ready for downstream use

The database is now **analytically trustworthy**.

---

## üß≠ How Our Understanding Changed

### üîÅ Track Status Reframed
Initially treated as a fact table ‚Äî now understood as:

> A temporal overlay that must be mapped *after* lap timelines exist.

### üß± PostgreSQL Repositioned
Not a processing engine, but a **validated backbone**.

### üß† Scope Tightened
Notebook 03 ends **before any strategy logic** by design.

---

## üöÄ NEXT STEPS ‚Äî Notebook 04 (VERY IMPORTANT)

Notebook 04 is where **time finally enters the system**.

### üìå Notebook 04 ‚Äî Cumulative Lap Timelines & Temporal Mapping

#### 1Ô∏è‚É£ Compute cumulative race time per driver
- Using `laps.lap_time_ms`
- Ordered by `(race_id, driver_code, lap_number)`
- Produces:
  - `lap_start_time_ms`
  - `lap_end_time_ms`

This converts laps from **discrete rows** into a **continuous timeline**.

---

#### 2Ô∏è‚É£ Identify pit laps & out laps
Using:
- Lap time spikes
- Tyre compound changes
- Stint boundaries

Outputs:
- `is_pit_lap`
- `is_out_lap`
- `stint_id`

---

#### 3Ô∏è‚É£ Load & map track status events üö¶
- Read `track_status.parquet`
- Convert events to time ranges
- Overlay onto lap timelines

Produces:
- `is_green_lap`
- `is_sc_lap`
- `is_vsc_lap`

This step is **impossible before Notebook 04**.

---

#### 4Ô∏è‚É£ Filter analytically valid laps
- Exclude:
  - Safety Car laps
  - Red flag laps
  - Out laps
- Preserve only:
  - Competitive green-flag laps

---

#### 5Ô∏è‚É£ Prepare deterministic undercut inputs
At the end of Notebook 04, we will have:

- Comparable lap deltas
- Clean stint transitions
- Strategy-ready lap windows

‚ö†Ô∏è **No undercut detection yet**
That belongs to Notebook 05.

---

## üèÅ Final Reflection

Notebook 03 transformed the project from:

> ‚ÄúClean files on disk‚Äù

into:

> ‚ÄúA verified relational system that can survive scrutiny.‚Äù

Every mistake exposed a hidden assumption.
Every fix sharpened the architecture.

With this foundation complete, **strategy analysis can now begin ‚Äî safely, reproducibly, and without shortcuts**.


In [2]:
# List all files and directories inside the project root

from pathlib import Path

# Set your project root here:
project_root = Path(r"C:\Users\hersh\Desktop\f1_analysis_project")

print(f"üìÇ Listing all files under: {project_root}\n")

all_paths = list(project_root.rglob("*"))

for p in all_paths:
    if p.is_dir():
        print(f"[DIR ] {p}")
    else:
        print(f"[FILE] {p}")

print(f"\nTotal items found: {len(all_paths)}")


üìÇ Listing all files under: C:\Users\hersh\Desktop\f1_analysis_project

[FILE] C:\Users\hersh\Desktop\f1_analysis_project\.env
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\.git
[FILE] C:\Users\hersh\Desktop\f1_analysis_project\.gitignore
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\data
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\database
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\docs
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\logs
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\notebooks
[FILE] C:\Users\hersh\Desktop\f1_analysis_project\README.md
[FILE] C:\Users\hersh\Desktop\f1_analysis_project\requirements.txt
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\src
[DIR ] C:\Users\hersh\Desktop\f1_analysis_project\tests
[FILE] C:\Users\hersh\Desktop\f1_analysis_project\.git\COMMIT_EDITMSG
[FILE] C:\Users\hersh\Desktop\f1_analysis_project\.git\config
[FILE] C:\Users\hersh\Desktop\f1_analysis_project\.git\description
[FILE] C:\Users\hersh\Desktop\f1_an