In [2]:
# Cell 1: Definitions & setup logic
# ────────────────────────────────────────────────────
import sys
from pathlib import Path

# Compute project paths
nb_dir   = Path.cwd().resolve()    # …/notebooks
repo_dir = nb_dir.parent           # repo root

# Make sure our repo root is on the import path *before* any pipeline imports
sys.path.insert(0, str(repo_dir))

# Now it will find pipeline.utils.duckdb_wrapper
from pipeline.utils.duckdb_wrapper import DuckDBWrapper
from pipeline.datasets import *

# Helper: initialize a DuckDBWrapper with optional reset logic
def initialize(duckdb_path: Path, delete_on_disk: bool, reset_catalog: bool):
    """
    1) Closes any existing `con`
    2) Deletes on‐disk files (duckdb + WAL) if delete_on_disk=True
    3) Connects a fresh DuckDBWrapper at duckdb_path
    4) Drops all tables/views in‐session if reset_catalog=True
    Returns the new `con`.
    """
    global con

    # 1) close old connection
    try:
        con.con.close()
        del con
    except Exception:
        pass

    # 2) delete on-disk files if asked
    if delete_on_disk:
        wal_file = duckdb_path.with_suffix(duckdb_path.suffix + "-wal")
        for f in (duckdb_path, wal_file):
            if f.exists():
                f.unlink()

    # 3) connect
    duckdb_path.parent.mkdir(parents=True, exist_ok=True)
    con = DuckDBWrapper(duckdb_path)

    # 4) reset in-session catalog if asked
    if reset_catalog:
        rows = con.con.execute("""
            SELECT table_name, table_type
            FROM information_schema.tables
            WHERE table_schema = 'main'
        """).fetchall()
        for name, typ in rows:
            con.con.execute(f'DROP {typ} IF EXISTS "{name}" CASCADE')

    return con


In [3]:
# Cell 2: Configure & initialize

# 1) DuckDB file path (edit if you move it)
duckdb_file  = Path("../data/duckdb/test.duckdb")  

# 2) One toggle for a “fresh start”
FRESH_START  = True   # if True: deletes on‐disk + resets catalog; if False: leaves everything intact

# 3) Run initialization (passes the same flag twice)
con = initialize(
    duckdb_path    = duckdb_file,
    delete_on_disk = FRESH_START,
    reset_catalog  = FRESH_START,
)

# 4) Inspect
print(f"Connected to: {duckdb_file}")
print("Current tables/views:")
con.show_tables()

Connected to: ../data/duckdb/test.duckdb
Current tables/views:


In [4]:
BASE_PATH = "data/opendata/clean"

con.bulk_register_data(
    repo_root   = repo_dir,
    base_path   = BASE_PATH,
    table_names = SINGLE_FILE_ASSETS_NAMES,
    wildcard    = "*.parquet",
    as_table    = False,
)

con.bulk_register_partitioned_data(
    repo_root   = repo_dir,
    base_path   = BASE_PATH,
    table_names = PARTITIONED_ASSETS_NAMES,
    wildcard    = "year=*/month=*/*.parquet",
    as_table    = False,
    show_tables = True,                 # optional: prints a neat summary
)

View 'nfl_pbp_2024' created for files at '/home/christiandata/nfl-analytics-pipeline/data/opendata/clean/nfl_pbp_2024/*.parquet'.
View 'nfl_pbp_ten_years' created for files at '/home/christiandata/nfl-analytics-pipeline/data/opendata/clean/nfl_pbp_ten_years/*.parquet'.
Skipping partitioned : no .parquet files found with pattern => /home/christiandata/nfl-analytics-pipeline/data/opendata/clean/year=*/month=*/*.parquet


In [None]:
query = f"""
SELECT 
    week,
    penalty_type,
    COUNT(*) AS total_penalties
FROM nfl_pbp_2024
WHERE penalty_type IS NOT NULL
GROUP BY 
    week,
    penalty_type
ORDER BY 
    week ASC


"""

result = con.run_query(query)

print(result)


In [None]:
#If you want a better looking table, set show_results=True. I'd recomend capping the limit at about 50 rows


query = f"""

SELECT 
    week,
    penalty_type,
    COUNT(*) AS total_penalties
FROM nfl_pbp_2024
WHERE penalty_type IS NOT NULL
GROUP BY 
    week,
    penalty_type
ORDER BY 
    week ASC

LIMIT 20

"""

result = con.run_query(query,show_results=True)


In [5]:
#export one off query queries into parquet, CSV, or JSON
query = f"""

SELECT 
    week,
    penalty_type,
    COUNT(*) AS total_penalties
FROM nfl_pbp_2024
WHERE penalty_type IS NOT NULL
GROUP BY 
    week,
    penalty_type
ORDER BY 
    week ASC

"""

result = con.run_query(query)

print(result)

repo_root = Path.cwd().resolve().parents[0]  # Adjust to locate the repo root
base_path = repo_root / "data/exports"
file_name = "penalty_totals"
file_type= "csv"
# Export the query result to CSV
con.export(result, file_type=file_type, base_path=base_path, file_name=file_name)

shape: (576, 3)
┌──────┬───────────────────────────────┬─────────────────┐
│ week ┆ penalty_type                  ┆ total_penalties │
│ ---  ┆ ---                           ┆ ---             │
│ i32  ┆ str                           ┆ i64             │
╞══════╪═══════════════════════════════╪═════════════════╡
│ 1    ┆ Delay of Game                 ┆ 13              │
│ 1    ┆ Illegal Contact               ┆ 2               │
│ 1    ┆ Neutral Zone Infraction       ┆ 2               │
│ 1    ┆ Kick Catch Interference       ┆ 1               │
│ 1    ┆ Illegal Shift                 ┆ 3               │
│ …    ┆ …                             ┆ …               │
│ 22   ┆ Offensive Holding             ┆ 3               │
│ 22   ┆ Illegal Block Above the Waist ┆ 1               │
│ 22   ┆ False Start                   ┆ 3               │
│ 22   ┆ Illegal Use of Hands          ┆ 1               │
│ 22   ┆ Unsportsmanlike Conduct       ┆ 1               │
└──────┴───────────────────────────────┴

In [None]:
# Execute and export all .sql files in the notebooks/sql folder
sql_dir    = nb_dir / "sql"                  # notebooks/sql/*.sql
export_dir = repo_dir / "data" / "exports"   # where we write .parquet
export_dir.mkdir(parents=True, exist_ok=True)

for sql_path in sorted(sql_dir.glob("*.sql")):
    sql_text  = sql_path.read_text()
    out_name  = sql_path.stem                # e.g. rides, new, test
    print(f"▶︎ Running {sql_path.name}")

    result_df = con.run_query(sql_text)      # Polars DataFrame

    con.export(
        result     = result_df,
        file_type  = "parquet",
        base_path  = export_dir,
        file_name  = out_name,
    )

print(f"✓ Done — results saved under {export_dir}")

In [None]:
#Optional:register your new outputs into duckdb
result_paths  = sorted(export_dir.glob("*.parquet"))
view_names    = [p.stem for p in result_paths]          # "new", "rides", "test", …

con.register_data_view(
    paths        = result_paths,
    table_names  = view_names
)

# optional: confirm they’re there
con.show_tables()

In [None]:
#Close the notebook before using Harlequin with the created duckdb file
con.con.close()

In [None]:
# ---------------------------------------------------------------------
# Convert every notebooks/sql/*.sql into dbt‑compatible models
# ---------------------------------------------------------------------
import subprocess
import sys
from pathlib import Path

DEBUG = False                  # set True to enable convertdbtsql.py "debug" mode

# 1) locate repo root
nb_dir   = Path.cwd().resolve()           # .../notebooks
repo_dir = nb_dir.parent                  # repo root

# 2) build all required paths
sql_dir   = nb_dir / "sql"                # source folder (already exists)
dest_dir  = repo_dir / "transformations" / "dbt" / "models"
tool_path = repo_dir / "tools" / "convertdbtsql.py"

# 3) sanity checks
for p in (sql_dir, tool_path):
    if not p.exists():
        raise FileNotFoundError(p)

dest_dir.mkdir(parents=True, exist_ok=True)

# 4) assemble argv and run
argv = [
    sys.executable,                       # the current Python interpreter
    str(tool_path),
    str(dest_dir),
    str(sql_dir),
]
if DEBUG:
    argv.append("debug")

print("▶︎ Converting SQL files for dbt …")
subprocess.run(argv, check=True)
print(f"✓ Done — rewritten models are in {dest_dir}")
