# Formula 1 (1950–2024) — GitHub ZIP → Unity Catalog (Delta)
**Goal:** Download the F1 dataset from a GitHub ZIP, store it in a Unity Catalog **Volume**, and auto-ingest each CSV into **Delta tables** for SQL/Genie.

**Stack:** Databricks Serverless · Unity Catalog · UC Volumes · Delta Lake · Python + Bash

Kaggle F1 Dataset: [Link](https://www.kaggle.com/datasets/rohanrao/formula-1-world-championship-1950-2020)


## 1) Prerequisites & Parameters
- Permission to create/use a **UC catalog, schema, and volume**.
- A valid **ADLS Gen2** path for the catalog **MANAGED LOCATION**.
- Source ZIP: **GitHub** (no Kaggle login needed).

We’ll:
1) Create (or reuse) catalog/schema/volume  
2) Download & unzip into the Volume  
3) Ingest all CSVs (recursively) → **Delta tables** in Unity Catalog


## 2) Configuration (Python)

In [0]:
# --- Configuration: change these if needed ---

CATALOG = "_databricks_demos"          # UC catalog
SCHEMA  = "genie_data"                 # UC schema (aka database)
VOLUME  = "f1_data"                    # UC volume name

# Managed location for the new catalog (ADLS Gen2 URI)
# ⚠️ ATENTION: Change it to your case
CATALOG_MANAGED_LOCATION = "abfss://<your_container>@<your_storage>.dfs.core.windows.net/"

# Source ZIP (your GitHub archive)
ARCHIVE_URL = "https://github.com/ArnoldSouza/databricks-genie-teams-copilot-m365/raw/refs/heads/main/examples/data/archive.zip"

# Where files will be placed inside the UC Volume
DEST_DIR = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"
RAW_DIR  = f"{DEST_DIR}/raw"   # we’ll unzip here

print("Configured paths:")
print(f"  Catalog.Schema.Volume : {CATALOG}.{SCHEMA}.{VOLUME}")
print(f"  Volume root dir       : {DEST_DIR}")
print(f"  Raw unzip dir         : {RAW_DIR}")
print(f"  ZIP source            : {ARCHIVE_URL}")


## 3) Create UC Catalog, Schema, and Volume (Python)

In [0]:
# Create catalog, schema, and volume. Then set the current context.

spark.sql(f"""
  CREATE CATALOG IF NOT EXISTS {CATALOG}
  MANAGED LOCATION '{CATALOG_MANAGED_LOCATION}'
""")

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

display(spark.sql("SHOW VOLUMES"))


## Download & Unzip from GitHub (Bash)

In [0]:
%sh

# --- Download the GitHub ZIP and unzip into the UC Volume ---
URL="https://github.com/ArnoldSouza/databricks-genie-teams-copilot-m365/raw/refs/heads/main/examples/data/archive.zip"
DEST_DIR="/Volumes/_databricks_demos/genie_data/f1_data"
RAW_DIR="${DEST_DIR}/raw"

set -euo pipefail

echo "Creating target dirs..."
mkdir -p "$RAW_DIR"

echo "Downloading ZIP..."
# Use a stable file name
ZIP_PATH="${RAW_DIR}/archive.zip"
wget -q -O "$ZIP_PATH" "$URL"

echo "Unzipping..."
# -o to overwrite; -d to target; -q for quieter output (remove -q if you want details)
unzip -o -q "$ZIP_PATH" -d "$RAW_DIR"

echo "Cleaning up ZIP..."
rm -f "$ZIP_PATH"

echo "Files extracted to $RAW_DIR:"
# Show a limited listing for readability
find "$RAW_DIR" -maxdepth 2 -type f | sed -n '1,200p'


## 5) Quick Audit (Bash)

In [0]:
%sh

RAW_DIR="/Volumes/_databricks_demos/genie_data/f1_data/raw"
echo "CSV files found (first 200 lines of listing):"
find "$RAW_DIR" -type f -name "*.csv" | sed -n '1,200p'


## 6) Ingest All CSVs → Delta Tables (Python)

In [0]:
# Ingest all CSVs → Delta tables

import os, re
from pyspark.sql import functions as F

RAW_DIR = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/raw"

def sanitize_table_name(path: str) -> str:
    base = os.path.splitext(os.path.basename(path))[0]
    base = re.sub(r"[^a-z0-9_]+", "_", base.lower()).strip("_")
    return base

# Recursively collect CSVs
csv_paths = []
for root, _, files in os.walk(RAW_DIR):
    for f in files:
        if f.lower().endswith(".csv"):
            csv_paths.append(os.path.join(root, f))

if not csv_paths:
    raise FileNotFoundError(f"No CSV files found under {RAW_DIR}")

created = []
for path in sorted(csv_paths):
    tbl  = sanitize_table_name(path)
    fqtn = f"{CATALOG}.{SCHEMA}.{tbl}"

    df = (spark.read.format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .option("multiLine", "true")
          .option("escape", '"')
          .load(path)
          .withColumn("_ingest_file", F.col("_metadata.file_path"))
          .withColumn("_ingest_ts",   F.current_timestamp())
          .withColumn("_source_modified", F.col("_metadata.file_modification_time"))
         )

    (df.write
       .format("delta")
       .mode("overwrite")
       .option("overwriteSchema", "true")
       .saveAsTable(fqtn))

    spark.sql(f"COMMENT ON TABLE {fqtn} IS 'F1 GitHub ZIP import; auto-generated'")

    created.append((fqtn, df.count()))

print("Created/overwritten tables:")
for fqtn, n in created:
    print(f"  - {fqtn} (rows: {n})")


## 7) Test one table

In [0]:
%sql
select * from _databricks_demos.genie_data.circuits limit 10