In [2]:
import duckdb, pandas as pd, pyarrow
print("duckdb:", duckdb.__version__)


duckdb: 1.4.3


### Day 1 pipeline

Update the filename to match the CSV you downloaded (the diabetes dataset usually is diabetic_data.csv).

In [5]:
from pathlib import Path
import duckdb

root = Path("..")  # because notebook is inside notebooks/
raw_csv = root / "data" / "raw" / "diabetic_data.csv"   # change if your file name differs
db_path = root / "data" / "warehouse" / "day1.duckdb"

db_path.parent.mkdir(parents=True, exist_ok=True)

con = duckdb.connect(str(db_path))
print("Using DB:", db_path)
print("Using CSV:", raw_csv)

# BRONZE: raw as-is
con.execute("""
CREATE OR REPLACE TABLE bronze_diabetes AS
SELECT *, current_timestamp AS ingest_ts
FROM read_csv_auto(?, ALL_VARCHAR=1)
""", [str(raw_csv)])

print("bronze rows:", con.execute("SELECT COUNT(*) FROM bronze_diabetes").fetchone()[0])

# SILVER: convert '?' to NULL in all columns 
def qident(name: str) -> str:
    return '"' + name.replace('"', '""') + '"'

cols = [r[1] for r in con.execute("PRAGMA table_info('bronze_diabetes')").fetchall()]

exprs = ",\n  ".join([
    f"NULLIF({qident(c)}, '?') AS {qident(c)}"
    for c in cols if c != "ingest_ts"
])

con.execute(f"""
CREATE OR REPLACE TABLE silver_diabetes AS
SELECT
  {exprs},
  ingest_ts
FROM bronze_diabetes
""")

print("silver rows:", con.execute("SELECT COUNT(*) FROM silver_diabetes").fetchone()[0])


con.execute("""
CREATE OR REPLACE TABLE silver_diabetes_typed AS
SELECT
  CAST("encounter_id" AS BIGINT) AS encounter_id,
  CAST("patient_nbr" AS BIGINT) AS patient_nbr,
  CAST("time_in_hospital" AS INTEGER) AS time_in_hospital,
  "readmitted" AS readmitted,
  CASE WHEN "readmitted" = '<30' THEN 1 ELSE 0 END AS y_readmit_30
FROM silver_diabetes
""")

# GOLD: modeling table (one row per encounter)
con.execute("""
CREATE OR REPLACE TABLE gold_diabetes_base AS
SELECT
  encounter_id,
  patient_nbr AS person_id,
  DATE '2008-01-01' + (encounter_id % 365) * INTERVAL 1 DAY AS t0_date,
  y_readmit_30 AS label,
  time_in_hospital,
  readmitted
FROM silver_diabetes_typed
""")

print("gold rows:", con.execute("SELECT COUNT(*) FROM gold_diabetes_base").fetchone()[0])
print("label prevalence:", con.execute("SELECT AVG(CAST(label AS DOUBLE)) FROM gold_diabetes_base").fetchone()[0])


Using DB: ..\data\warehouse\day1.duckdb
Using CSV: ..\data\raw\diabetic_data.csv
bronze rows: 101766
silver rows: 101766
gold rows: 101766
label prevalence: 0.11159915885462728


#### After you run it, do these quick checks

In [6]:
print("bronze:", con.execute("SELECT COUNT(*) FROM bronze_diabetes").fetchone()[0])
print("silver:", con.execute("SELECT COUNT(*) FROM silver_diabetes").fetchone()[0])
print("gold:", con.execute("SELECT COUNT(*) FROM gold_diabetes_base").fetchone()[0])


bronze: 101766
silver: 101766
gold: 101766


#### Run the real three DQ checks (so we have evidence)

In [7]:
# 1) uniqueness: encounter_id must be unique in gold
n_rows, n_distinct, n_dups = con.execute("""
SELECT
  COUNT(*) AS n_rows,
  COUNT(DISTINCT encounter_id) AS n_distinct,
  COUNT(*) - COUNT(DISTINCT encounter_id) AS n_dups
FROM gold_diabetes_base
""").fetchone()
print("Rows:", n_rows, "Distinct encounter_id:", n_distinct, "Duplicates:", n_dups)

# 2) label nulls
n_label_null = con.execute("""
SELECT SUM(CASE WHEN label IS NULL THEN 1 ELSE 0 END) AS n_label_null
FROM gold_diabetes_base
""").fetchone()[0]
print("Label nulls:", n_label_null)

# 3) time_in_hospital range
n_bad = con.execute("""
SELECT SUM(CASE WHEN time_in_hospital < 1 OR time_in_hospital > 14 THEN 1 ELSE 0 END) AS n_out_of_range
FROM gold_diabetes_base
""").fetchone()[0]
print("time_in_hospital out of range:", n_bad)


Rows: 101766 Distinct encounter_id: 101766 Duplicates: 0
Label nulls: 0
time_in_hospital out of range: 0


In [8]:
con.close()
