# Kaggle Data Ingestion

In [None]:
import sqlite3
import pandas as pd
from tqdm.auto import tqdm
import urllib.request
import os
import zipfile
import warnings

def download_and_extract(lakehouse_schema, url):
    # download file
    zip_file = f"{lakehouse_schema}.zip"
    urllib.request.urlretrieve(url, zip_file)

    # unzip
    with zipfile.ZipFile(zip_file, "r") as zf:
        zf.extractall(lakehouse_schema)

    # create schema
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{lakehouse_schema}`")       

def ingest(lakehouse_schema, url):
    download_and_extract(lakehouse_schema, url)

    # open database
    conn = sqlite3.connect(f"{lakehouse_schema}/database.sqlite")

    # find all tables
    tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", conn)["name"].values

    # ingest each table
    for table in (bar := tqdm(tables)):
        bar.set_description(f"Ingesting table {lakehouse_schema}.{table}")
        
        full_table  = f"`{lakehouse_schema}`.`{table}`"

        # skip if table is already there
        if spark.catalog.tableExists(full_table):
            continue

        df = pd.read_sql_query(f"SELECT * FROM {table};", conn)

        # fix types
        if lakehouse_schema == "TheHistoryofBaseball" and table == "pitching":
            df.iloc[:, 6:] = df.iloc[:, 6:].replace('', None).astype("Float64")
        elif lakehouse_schema == "TheHistoryofBaseball" and table == "pitching_postseason":
            df.iloc[:, 5:] = df.iloc[:, 5:].replace('', None).replace('-', None).astype("Float64")
        
        # write to Lakehouse
        with warnings.catch_warnings():
                warnings.filterwarnings(
                    "ignore",
                    category=UserWarning,
                    message=r".*createDataFrame attempted Arrow optimization.*"
                )
        
                (spark
                    .createDataFrame(df)
                    .write
                    .mode("overwrite")
                    .option("overwriteSchema", "true")
                    .saveAsTable(full_table))

    conn.close()

# Ingest Football, Baseball and Pesticide

In [None]:
sources = [
    ("WorldSoccerDataBase",  "https://www.kaggle.com/api/v1/datasets/download/sashchernuh/european-football"),
    ("TheHistoryofBaseball", "https://www.kaggle.com/api/v1/datasets/download/seanlahman/the-history-of-baseball"),
    ("Pesticide",            "https://www.kaggle.com/api/v1/datasets/download/usdeptofag/pesticide-data-program-2015"),
]

for lakehouse_schema, url in (bar := tqdm(sources)):
    bar.set_description(f"Ingesting {lakehouse_schema}")
    ingest(lakehouse_schema, url)

# Student Math Scores

In [None]:
download_and_extract("StudentMathScore", "https://www.kaggle.com/api/v1/datasets/download/loganhenslee/studentmathscores")

StatementMeta(, 59c73f21-f49c-4d7f-b823-b664b809c853, -1, Cancelled, , Cancelled)

In [None]:
for table, path in (bar := tqdm([
    ("finrev_fed_17", "StudentMathScore/FINREV_FED_17.csv"),
    ("finrev_fed_key_17", "StudentMathScore/FINREV_FED_KEY_17.csv"),
    ("ndecoreexcel_math_grade8", "StudentMathScore/NDECoreExcel_Math_Grade 8.csv")
])):
    bar.set_description(f"Table: {table}")

    df = pd.read_csv(path)
    df.rename(columns=str.strip, inplace=True)

    # write to Lakehouse
    (spark
        .createDataFrame(df)
        .write
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(f"`StudentMathScore`.`{table}`"))

StatementMeta(, 59c73f21-f49c-4d7f-b823-b664b809c853, -1, Cancelled, , Cancelled)

# Great Manchester Crime

In [None]:
download_and_extract("GreaterManchesterCrime", "https://www.kaggle.com/api/v1/datasets/download/pratikbu/great-manchester-crime")

StatementMeta(, 59c73f21-f49c-4d7f-b823-b664b809c853, -1, Cancelled, , Cancelled)

In [None]:
import re
import csv
from io import StringIO
import pandas as pd

# --- config ---
sql_path = "GreaterManchesterCrime/GreaterManchesterCrime.sql"
table = "GreaterManchesterCrime"
columns = ["CrimeID","CrimeTS","Location","LSOA","Type","Outcome","theGeom"]

# 1) Read the SQL file
with open(sql_path, "r", encoding="utf-8") as f:
    sql_text = f.read()

# 2) Find all INSERT lines for the target table and capture the (...) inside VALUES
insert_re = re.compile(
    rf"INSERT\s+INTO\s+{re.escape(table)}\s*\([^)]+\)\s*VALUES\s*\((.+?)\)\s*;",
    re.IGNORECASE | re.DOTALL
)

# helper: turn ST_GeomFromText('POINT(x y)',4326) -> 'POINT(x y)'
def normalize_geom(expr: str) -> str:
    # replace ST_GeomFromText('...wkt...', 4326) with '...wkt...'
    return re.sub(
        r"ST_GeomFromText\(\s*'([^']+)'\s*,\s*\d+\s*\)",
        r"'\1'",
        expr,
        flags=re.IGNORECASE
    )

rows = []
for m in insert_re.finditer(sql_text):
    values_blob = m.group(1).strip()
    values_blob = normalize_geom(values_blob)

    # Use CSV parsing to respect quoted commas/strings
    # (values are single-quoted; delimiter is comma)
    reader = csv.reader(StringIO(values_blob), delimiter=',', quotechar="'", skipinitialspace=True)
    tokens = next(reader)

    # Trim whitespace around unquoted tokens
    tokens = [t.strip() for t in tokens]

    # Sanity check length
    if len(tokens) != len(columns):
        raise ValueError(f"Expected {len(columns)} values, got {len(tokens)}: {tokens}")

    rows.append(tokens)

# 3) Build DataFrame and do light typing
df = pd.DataFrame(rows, columns=columns)
df["CrimeTS"] = pd.to_datetime(df["CrimeTS"], errors="coerce")

# Optional: extract lon/lat from WKT 'POINT(lon lat)'
lon, lat = [], []
for wkt in df["theGeom"].fillna(""):
    m = re.search(r"POINT\(\s*([+-]?\d+(?:\.\d+)?)\s+([+-]?\d+(?:\.\d+)?)\s*\)", wkt)
    if m:
        lon.append(float(m.group(1)))
        lat.append(float(m.group(2)))
    else:
        lon.append(None)
        lat.append(None)
df["lon"] = lon
df["lat"] = lat

# write to Lakehouse
(spark
    .createDataFrame(df)
    .write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(f"`GreaterManchesterCrime`.`greatermanchestercrime`"))

StatementMeta(, 59c73f21-f49c-4d7f-b823-b664b809c853, -1, Cancelled, , Cancelled)