<a href="https://colab.research.google.com/github/Horcrux-Saniya/Mini-Lakehouse/blob/main/03_model_star.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

3.1 Imports, paths, and helpers

In [1]:
# Day 3 — Robust setup + loader (replaces Cells 1 & 2)
from pathlib import Path
import pandas as pd

# ---- (Colab) Mount Drive safely ----
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=False)
except Exception as e:
    print("Drive mount note:", e)

# ---- Locate project base robustly ----
def find_base():
    cands = [Path('/content/drive/MyDrive/mini-lakehouse'),
             Path('/content/drive/My Drive/mini-lakehouse')]
    for p in cands:
        if p.exists():
            return p
    matches = list(Path('/content/drive').glob('**/mini-lakehouse'))
    return matches[0] if matches else cands[0]

BASE = find_base()
DATA = BASE / 'data'
RAW_DIR = DATA / 'raw'
STAGING_DIR = DATA / 'staging'
CURATED_DIR = DATA / 'curated'
DOCS_DIR = BASE / 'docs'
for d in (RAW_DIR, STAGING_DIR, CURATED_DIR, DOCS_DIR):
    d.mkdir(parents=True, exist_ok=True)

print("BASE ->", BASE)
print("RAW_DIR ->", RAW_DIR)
print("STAGING_DIR ->", STAGING_DIR)

# ---- Helpers to load any supported file ----
def load_any(path: Path) -> pd.DataFrame:
    suf = path.suffix.lower()
    if suf == '.parquet':
        return pd.read_parquet(path)
    if suf == '.csv':
        return pd.read_csv(path)
    if suf in ('.xlsx', '.xls'):
        xls = pd.ExcelFile(path)
        sheet = 'Orders' if 'Orders' in xls.sheet_names else xls.sheet_names[0]
        return pd.read_excel(xls, sheet_name=sheet)
    raise ValueError(f"Unsupported file: {path}")

def minimal_clean(df: pd.DataFrame) -> pd.DataFrame:
    # light coercions so Day 3 continues smoothly
    for c in ['Order Date','Order_Date','OrderDate','Ship Date','Ship_Date','ShipDate']:
        if c in df.columns:
            df[c] = pd.to_datetime(df[c], errors='coerce')
    for c in ['Sales','Quantity','Discount','Profit']:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')
    return df

# ---- Try STAGING first ----
parqs = sorted(STAGING_DIR.glob('*.parquet'))
csvs  = sorted(STAGING_DIR.glob('*.csv'))

if parqs or csvs:
    src = parqs[0] if parqs else csvs[0]
    df  = load_any(src)
    print(f"Loaded cleaned file from STAGING: {src.name} -> rows={len(df):,}, cols={df.shape[1]}")
else:
    # ---- Fallback to RAW; if empty, search whole Drive for Superstore files ----
    raw_candidates = sorted(RAW_DIR.glob('*.*'))
    if not raw_candidates:
        patterns = ['**/GlobalSuperstore.*', '**/Superstore*.*']
        found = []
        for pat in patterns:
            found += list(Path('/content/drive').glob(pat))
        found = [p for p in found if p.suffix.lower() in ('.csv','.xlsx','.xls','.parquet')]
        if not found:
            raise FileNotFoundError(
                "No cleaned file in /data/staging and no raw file found. "
                "Place GlobalSuperstore/Superstore CSV/XLSX in /mini-lakehouse/data/raw and rerun."
            )
        raw = sorted(found)[0]
        print("Found RAW outside project at:", raw)
    else:
        raw = raw_candidates[0]
        print("Using RAW at:", raw)

    df = minimal_clean(load_any(raw))
    out = STAGING_DIR / 'GlobalSuperstore_clean.parquet'
    df.to_parquet(out, index=False)
    print(f"Staged -> {out.name} | rows={len(df):,}, cols={df.shape[1]}")

# Peek
df.head(2)


Mounted at /content/drive
BASE -> /content/drive/MyDrive/mini-lakehouse
RAW_DIR -> /content/drive/MyDrive/mini-lakehouse/data/raw
STAGING_DIR -> /content/drive/MyDrive/mini-lakehouse/data/staging
Loaded cleaned file from STAGING: superstore_clean_20251107_082303.parquet -> rows=51,290, cols=24


Unnamed: 0,row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,city,state,...,product_id,category,subcategory,product_name,sales,quantity,discount,profit,shipping_cost,order_priority
0,32298,CA-2012-124891,2012-07-31,2012-07-31,Same Day,RH-19495,Rick Hansen,Consumer,New York City,New York,...,TEC-AC-10003033,Technology,Accessories,Plantronics CS510 - Over-the-Head monaural Wir...,2309.65,7,0.0,762.1845,933.57,Critical
1,26341,IN-2013-77878,2013-02-05,2013-02-07,Second Class,JR-16210,Justin Ritter,Corporate,Wollongong,New South Wales,...,FUR-CH-10003950,Furniture,Chairs,"Novimex Executive Leather Armchair, Black",3709.395,9,0.1,-288.765,923.63,Critical


3.2 Load cleaned dataset from /data/staging (auto-detect)

In [2]:
# Cell 2B — Rescue Loader (handles alt paths, full-drive search, or manual upload)
from pathlib import Path
import pandas as pd

# Reuse vars if defined; else set minimal defaults
try:
    BASE
except NameError:
    BASE = Path('/content/drive/MyDrive/mini-lakehouse')
DATA = BASE / 'data'
STAGING_DIR = DATA / 'staging'
RAW_DIR = DATA / 'raw'
STAGING_DIR.mkdir(parents=True, exist_ok=True)
RAW_DIR.mkdir(parents=True, exist_ok=True)

ALT_RAW = Path('/content/drive/My Drive/mini-lakehouse/data/raw')  # space variant

def load_any(path: Path) -> pd.DataFrame:
    suf = path.suffix.lower()
    if suf == '.parquet':
        return pd.read_parquet(path)
    if suf == '.csv':
        return pd.read_csv(path)
    if suf in ('.xlsx', '.xls'):
        xls = pd.ExcelFile(path)
        sheet = 'Orders' if 'Orders' in xls.sheet_names else xls.sheet_names[0]
        return pd.read_excel(xls, sheet_name=sheet)
    raise ValueError(f"Unsupported file: {path}")

def minimal_clean(df: pd.DataFrame) -> pd.DataFrame:
    for c in ['Order Date','Order_Date','OrderDate','Ship Date','Ship_Date','ShipDate']:
        if c in df.columns:
            df[c] = pd.to_datetime(df[c], errors='coerce')
    for c in ['Sales','Quantity','Discount','Profit']:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')
    return df

# 1) Check STAGING first
cands = []
cands += [p for p in sorted(STAGING_DIR.glob('*')) if p.suffix.lower() in ('.parquet', '.csv')]
if not cands:
    # 2) Check RAW in both path variants
    for d in [RAW_DIR, ALT_RAW]:
        if d.exists():
            cands += [p for p in sorted(d.glob('*')) if p.suffix.lower() in ('.csv','.xlsx','.xls','.parquet')]

# 3) Search entire Drive if still nothing
if not cands:
    found = []
    for pat in ['**/GlobalSuperstore.*', '**/Superstore*.*']:
        found += list(Path('/content/drive').glob(pat))
    cands = [p for p in found if p.suffix.lower() in ('.csv','.xlsx','.xls','.parquet')]

# 4) Final fallback — manual upload
if not cands:
    try:
        from google.colab import files
        print("No dataset found. Please upload your GlobalSuperstore/Superstore file (CSV/XLSX/Parquet)…")
        uploaded = files.upload()  # opens file picker
        for name, data in uploaded.items():
            dest = RAW_DIR / name
            with open(dest, 'wb') as f: f.write(data)
            cands.append(dest)
        print("Uploaded:", [p.name for p in cands])
    except Exception as e:
        raise FileNotFoundError("Could not locate or upload the dataset. Upload a file and re-run this cell.") from e

# Use first candidate, stage to /data/staging
src = sorted(cands)[0]
df = minimal_clean(load_any(src))
out = STAGING_DIR / 'GlobalSuperstore_clean.parquet'
df.to_parquet(out, index=False)
print(f"Staged -> {out} | rows={len(df):,}, cols={df.shape[1]} | source -> {src}")
df.head(2)

Staged -> /content/drive/MyDrive/mini-lakehouse/data/staging/GlobalSuperstore_clean.parquet | rows=51,290, cols=24 | source -> /content/drive/MyDrive/mini-lakehouse/data/staging/superstore_clean_20251107_082303.parquet


Unnamed: 0,row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,city,state,...,product_id,category,subcategory,product_name,sales,quantity,discount,profit,shipping_cost,order_priority
0,32298,CA-2012-124891,2012-07-31,2012-07-31,Same Day,RH-19495,Rick Hansen,Consumer,New York City,New York,...,TEC-AC-10003033,Technology,Accessories,Plantronics CS510 - Over-the-Head monaural Wir...,2309.65,7,0.0,762.1845,933.57,Critical
1,26341,IN-2013-77878,2013-02-05,2013-02-07,Second Class,JR-16210,Justin Ritter,Corporate,Wollongong,New South Wales,...,FUR-CH-10003950,Furniture,Chairs,"Novimex Executive Leather Armchair, Black",3709.395,9,0.1,-288.765,923.63,Critical


In [7]:
# Cell 2C — tiny helpers for Cell 3+
import hashlib
import pandas as pd

def pick(df, candidates):
    """Return first column from `candidates` that exists in df; else raise."""
    for c in candidates:
        if c in df.columns:
            return c
    raise KeyError(f"None of the columns found: {candidates}")

def nstr(x):
    """Normalize to lowercase, trimmed string for stable hashing."""
    if pd.isna(x):
        return ''
    return str(x).strip().lower()

def skey(*values):
    """Deterministic 64-bit surrogate key via MD5 (first 16 hex -> int)."""
    s = '||'.join(nstr(v) for v in values)
    h = hashlib.md5(s.encode('utf-8')).hexdigest()[:16]
    return int(h, 16)

def date_sk(ts):
    """YYYYMMDD integer from a datetime-like."""
    return int(pd.to_datetime(ts).strftime('%Y%m%d'))

3.3 Column bindings (works across common Global Superstore variants)

In [11]:
# Cell 3 — Column bindings (case-insensitive, tolerant to lowercase headers)
import re
import pandas as pd

def _ncol(s: str) -> str:
    # normalize: lowercase + remove non-alphanumerics (spaces/underscores/hyphens ignored)
    return re.sub(r'[^0-9a-z]', '', s.lower())

_norm_map = { _ncol(c): c for c in df.columns }

def pick_ci(df, candidates, required=True, create_name=None):
    """Pick first existing column from candidates (case/space insensitive).
       If not found and required=False, create an empty column with create_name."""
    for cand in candidates:
        k = _ncol(cand)
        if k in _norm_map:
            return _norm_map[k]
    if required:
        raise KeyError(f"None of the columns found: {candidates}")
    name = create_name or (candidates[0] if candidates else 'missing_col')
    # Create an empty column so downstream code works
    df[name] = pd.NA
    _norm_map[_ncol(name)] = name
    return name

COL = {}
# ids & dates
COL['row_id']      = pick_ci(df, ['Row ID','Row_ID','RowId','row_id'])
COL['order_id']    = pick_ci(df, ['Order ID','Order_ID','OrderId','order_id'])
COL['order_date']  = pick_ci(df, ['Order Date','Order_Date','OrderDate','order_date'])
COL['ship_date']   = pick_ci(df, ['Ship Date','Ship_Date','ShipDate','ship_date'])
COL['ship_mode']   = pick_ci(df, ['Ship Mode','Ship_Mode','ShipMode','ship_mode'])

# customer
COL['cust_id']     = pick_ci(df, ['Customer ID','Customer_ID','CustomerId','customer_id'])
COL['cust_name']   = pick_ci(df, ['Customer Name','Customer_Name','CustomerName','customer_name'])
COL['segment']     = pick_ci(df, ['Segment','segment'])

# geography (create blanks if missing so pipeline still runs)
COL['country']     = pick_ci(df, ['Country','Country/Region','country','country_region'], required=False, create_name='country')
COL['region']      = pick_ci(df, ['Region','region'], required=False, create_name='region')
COL['state']       = pick_ci(df, ['State','State/Province','Province','state','state_province'], required=False, create_name='state')
COL['city']        = pick_ci(df, ['City','city'], required=False, create_name='city')
COL['postal']      = pick_ci(df, ['Postal Code','PostalCode','Zip','ZIP','zip','postal_code'], required=False, create_name='postal_code')

# product
COL['product_id']  = pick_ci(df, ['Product ID','Product_ID','ProductId','product_id'])
COL['product_nm']  = pick_ci(df, ['Product Name','Product_Name','ProductName','product_name'])
COL['category']    = pick_ci(df, ['Category','category'])
COL['subcat']      = pick_ci(df, ['Sub-Category','Sub Category','SubCategory','sub_category','subcategory'])

# measures
COL['sales']       = pick_ci(df, ['Sales','sales'])
COL['qty']         = pick_ci(df, ['Quantity','quantity'])
COL['discount']    = pick_ci(df, ['Discount','discount'])
COL['profit']      = pick_ci(df, ['Profit','profit'])

# ensure dates are datetime
df[COL['order_date']] = pd.to_datetime(df[COL['order_date']], errors='coerce')
df[COL['ship_date']]  = pd.to_datetime(df[COL['ship_date']],  errors='coerce')

print("Bound columns ->")
for k,v in COL.items(): print(f"  {k:12s} -> {v}")

Bound columns ->
  row_id       -> row_id
  order_id     -> order_id
  order_date   -> order_date
  ship_date    -> ship_date
  ship_mode    -> ship_mode
  cust_id      -> customer_id
  cust_name    -> customer_name
  segment      -> segment
  country      -> country
  region       -> region
  state        -> state
  city         -> city
  postal       -> postal_code
  product_id   -> product_id
  product_nm   -> product_name
  category     -> category
  subcat       -> subcategory
  sales        -> sales
  qty          -> quantity
  discount     -> discount
  profit       -> profit


In [12]:
list(df.columns)

['row_id',
 'order_id',
 'order_date',
 'ship_date',
 'ship_mode',
 'customer_id',
 'customer_name',
 'segment',
 'city',
 'state',
 'country',
 'postal_code',
 'market',
 'region',
 'product_id',
 'category',
 'subcategory',
 'product_name',
 'sales',
 'quantity',
 'discount',
 'profit',
 'shipping_cost',
 'order_priority']

3.4: Build Dimensions

In [13]:
# ---- dim_customer ----
dim_customer = (
    df[[COL['cust_id'], COL['cust_name'], COL['segment']]]
      .drop_duplicates()
      .rename(columns={
          COL['cust_id']: 'customer_id',
          COL['cust_name']: 'customer_name',
          COL['segment']: 'segment'
      })
      .assign(customer_sk=lambda d: d['customer_id'].apply(lambda x: skey(x)))
      [['customer_sk', 'customer_id', 'customer_name', 'segment']]
      .sort_values('customer_sk')
      .reset_index(drop=True)
)

# ---- dim_product ----
dim_product = (
    df[[COL['product_id'], COL['product_nm'], COL['category'], COL['subcat']]]
      .drop_duplicates()
      .rename(columns={
          COL['product_id']: 'product_id',
          COL['product_nm']: 'product_name',
          COL['category']: 'category',
          COL['subcat']: 'sub_category'
      })
      .assign(product_sk=lambda d: d['product_id'].apply(lambda x: skey(x)))
      [['product_sk', 'product_id', 'product_name', 'category', 'sub_category']]
      .sort_values('product_sk')
      .reset_index(drop=True)
)

# ---- dim_geography ----
dim_geography = (
    df[[COL['country'], COL['region'], COL['state'], COL['city'], COL['postal']]]
      .drop_duplicates()
      .rename(columns={
          COL['country']: 'country',
          COL['region']: 'region',
          COL['state']: 'state',
          COL['city']: 'city',
          COL['postal']: 'postal_code'
      })
)
# Normalize postal as string (some files store as float)
dim_geography['postal_code'] = dim_geography['postal_code'].astype(str).str.replace(r'\.0$', '', regex=True)

dim_geography['geography_sk'] = dim_geography.apply(
    lambda r: skey(r['country'], r['region'], r['state'], r['city'], r['postal_code']), axis=1
)
dim_geography = dim_geography[['geography_sk', 'country', 'region', 'state', 'city', 'postal_code']]\
                 .sort_values('geography_sk').reset_index(drop=True)

# ---- dim_calendar ----
min_d = df[COL['order_date']].min().date()
max_d = df[COL['order_date']].max().date()
cal = pd.date_range(min_d, max_d, freq='D')
dim_calendar = pd.DataFrame({'date': cal})
dim_calendar['date_sk'] = dim_calendar['date'].dt.strftime('%Y%m%d').astype(int)
dim_calendar['year'] = dim_calendar['date'].dt.year
dim_calendar['quarter'] = dim_calendar['date'].dt.quarter
dim_calendar['month'] = dim_calendar['date'].dt.month
dim_calendar['month_name'] = dim_calendar['date'].dt.month_name()
dim_calendar['week_of_year'] = dim_calendar['date'].dt.isocalendar().week.astype(int)
dim_calendar['day_of_month'] = dim_calendar['date'].dt.day
dim_calendar['day_name'] = dim_calendar['date'].dt.day_name()
dim_calendar['is_weekend'] = dim_calendar['day_name'].isin(['Saturday', 'Sunday']).astype(bool)
dim_calendar = dim_calendar[['date_sk','date','year','quarter','month','month_name',
                             'week_of_year','day_of_month','day_name','is_weekend']]

print("Dims ->",
      {"customer": len(dim_customer), "product": len(dim_product),
       "geography": len(dim_geography), "calendar": len(dim_calendar)})

Dims -> {'customer': 1590, 'product': 10768, 'geography': 3847, 'calendar': 1461}


3.5: Build fact_orders (line-item grain with surrogate keys)

In [14]:
fact_orders = df.rename(columns={
    COL['row_id']: 'row_id',
    COL['order_id']: 'order_id',
    COL['order_date']: 'order_date',
    COL['ship_date']: 'ship_date',
    COL['ship_mode']: 'ship_mode',
    COL['cust_id']: 'customer_id',
    COL['product_id']: 'product_id',
    COL['country']: 'country',
    COL['region']: 'region',
    COL['state']: 'state',
    COL['city']: 'city',
    COL['postal']: 'postal_code',
    COL['sales']: 'sales',
    COL['qty']: 'quantity',
    COL['discount']: 'discount',
    COL['profit']: 'profit'
})[[
    'row_id','order_id','order_date','ship_date','ship_mode',
    'customer_id','product_id','country','region','state','city','postal_code',
    'sales','quantity','discount','profit'
]]

# keys (deterministic)
fact_orders['customer_sk'] = fact_orders['customer_id'].apply(skey)
fact_orders['product_sk']  = fact_orders['product_id'].apply(skey)

# Normalize postal to string before geo key
fact_orders['postal_code'] = fact_orders['postal_code'].astype(str).str.replace(r'\.0$', '', regex=True)
fact_orders['geography_sk'] = fact_orders.apply(
    lambda r: skey(r['country'], r['region'], r['state'], r['city'], r['postal_code']), axis=1
)

# date surrogate keys
fact_orders['order_date_sk'] = fact_orders['order_date'].dt.strftime('%Y%m%d').astype(int)
fact_orders['ship_date_sk']  = fact_orders['ship_date'].dt.strftime('%Y%m%d').astype('Int64')  # allow NA

# Metrics types
fact_orders['quantity'] = pd.to_numeric(fact_orders['quantity'], errors='coerce').fillna(0).astype(int)
for c in ['sales','discount','profit']:
    fact_orders[c] = pd.to_numeric(fact_orders[c], errors='coerce').fillna(0.0).astype(float)

# Final projection (degenerate dims + FKs + metrics)
fact_orders = fact_orders[[
    'row_id','order_id','ship_mode',
    'customer_sk','product_sk','geography_sk','order_date_sk','ship_date_sk',
    'sales','quantity','discount','profit'
]].sort_values(['order_date_sk','order_id','row_id']).reset_index(drop=True)

print("Fact rows:", len(fact_orders))
fact_orders.head(3)

Fact rows: 51290


Unnamed: 0,row_id,order_id,ship_mode,customer_sk,product_sk,geography_sk,order_date_sk,ship_date_sk,sales,quantity,discount,profit
0,42433,AG-2011-2040,Standard Class,16608705407162062935,3811477438047827761,16109843290194333852,20110101,20110106,408.3,2,0.0,106.14
1,48883,HU-2011-1220,Second Class,13709656682537906091,2621979939612564538,168510249784381628,20110101,20110105,66.12,4,0.0,29.64
2,22253,IN-2011-47883,Standard Class,13014808880476421252,3698256773002908816,8608906302066371679,20110101,20110108,120.366,3,0.1,36.036


3.6: Persist to DuckDB and Parquet (/data/curated)

In [16]:
# Cell 6A — install & import DuckDB (run once)
try:
    import duckdb
except ModuleNotFoundError:
    !pip -q install duckdb
    import duckdb

print("duckdb version:", duckdb.__version__)

duckdb version: 1.3.2


In [17]:
# Write Parquet (columnar files)
dim_customer.to_parquet(CURATED_DIR / 'dim_customer.parquet', index=False)
dim_product.to_parquet(CURATED_DIR / 'dim_product.parquet', index=False)
dim_geography.to_parquet(CURATED_DIR / 'dim_geography.parquet', index=False)
dim_calendar.to_parquet(CURATED_DIR / 'dim_calendar.parquet', index=False)
fact_orders.to_parquet(CURATED_DIR / 'fact_orders.parquet', index=False)

print("Parquet written to /data/curated")

# Write DuckDB warehouse
db_path = BASE / 'warehouse.duckdb'
con = duckdb.connect(str(db_path))

# Register and create/replace tables
con.register('df_dim_customer', dim_customer)
con.register('df_dim_product', dim_product)
con.register('df_dim_geography', dim_geography)
con.register('df_dim_calendar', dim_calendar)
con.register('df_fact_orders', fact_orders)

con.execute("CREATE OR REPLACE TABLE dim_customer  AS SELECT * FROM df_dim_customer;")
con.execute("CREATE OR REPLACE TABLE dim_product   AS SELECT * FROM df_dim_product;")
con.execute("CREATE OR REPLACE TABLE dim_geography AS SELECT * FROM df_dim_geography;")
con.execute("CREATE OR REPLACE TABLE dim_calendar  AS SELECT * FROM df_dim_calendar;")
con.execute("CREATE OR REPLACE TABLE fact_orders   AS SELECT * FROM df_fact_orders;")

# Optional stats (for faster queries)
con.execute("ANALYZE;")

print(f"DuckDB written at: {db_path}")

Parquet written to /data/curated
DuckDB written at: /content/drive/MyDrive/mini-lakehouse/warehouse.duckdb


3.7: Data Dictionary (CSV + Markdown)

In [18]:
# Simple data dictionary generator
tables = {
    'dim_customer': dim_customer,
    'dim_product': dim_product,
    'dim_geography': dim_geography,
    'dim_calendar': dim_calendar,
    'fact_orders': fact_orders
}

col_desc = {
    # Common keys
    'customer_sk': 'Surrogate key for customer (deterministic hash of customer_id)',
    'product_sk': 'Surrogate key for product (deterministic hash of product_id)',
    'geography_sk': 'Surrogate key for geography (hash of country/region/state/city/postal)',
    'date_sk': 'Date surrogate key in YYYYMMDD',
    'order_date_sk': 'Order date as YYYYMMDD (FK to dim_calendar)',
    'ship_date_sk': 'Ship date as YYYYMMDD (FK to dim_calendar)',

    # Business keys/attrs
    'customer_id': 'Source customer ID',
    'customer_name': 'Customer full name',
    'segment': 'Customer segment',
    'product_id': 'Source product ID',
    'product_name': 'Product name/title',
    'category': 'Product category',
    'sub_category': 'Product sub-category',
    'country': 'Country',
    'region': 'Region',
    'state': 'State/Province',
    'city': 'City',
    'postal_code': 'Postal/ZIP code',
    'date': 'Calendar date',
    'year': 'Calendar year',
    'quarter': 'Quarter number (1–4)',
    'month': 'Month number (1–12)',
    'month_name': 'Full month name',
    'week_of_year': 'ISO week number',
    'day_of_month': 'Day of month',
    'day_name': 'Day name',
    'is_weekend': 'True if Saturday/Sunday',

    # Fact metrics
    'row_id': 'Line-level identifier from source',
    'order_id': 'Order identifier (degenerate dimension)',
    'ship_mode': 'Shipping mode (degenerate dimension)',
    'sales': 'Sales amount for the line',
    'quantity': 'Units sold for the line',
    'discount': 'Discount value for the line',
    'profit': 'Profit for the line'
}

rows = []
for tname, tdf in tables.items():
    for c in tdf.columns:
        rows.append({
            'table': tname,
            'column': c,
            'dtype': str(tdf.dtypes[c]),
            'description': col_desc.get(c, '')
        })

dd_df = pd.DataFrame(rows).sort_values(['table','column']).reset_index(drop=True)

# Save CSV + Markdown
csv_path = DOCS_DIR / 'data_dictionary.csv'
md_path  = DOCS_DIR / 'data_dictionary.md'
dd_df.to_csv(csv_path, index=False)

md = ["# Data Dictionary\n"]
for tname in tables.keys():
    md.append(f"## {tname}\n")
    sub = dd_df[dd_df['table']==tname][['column','dtype','description']]
    md.append(sub.to_markdown(index=False))
    md.append("\n")
md_text = "\n".join(md)
md_path.write_text(md_text, encoding='utf-8')

print("Wrote:", csv_path, "and", md_path)
dd_df.head(10)

Wrote: /content/drive/MyDrive/mini-lakehouse/docs/data_dictionary.csv and /content/drive/MyDrive/mini-lakehouse/docs/data_dictionary.md


Unnamed: 0,table,column,dtype,description
0,dim_calendar,date,datetime64[ns],Calendar date
1,dim_calendar,date_sk,int64,Date surrogate key in YYYYMMDD
2,dim_calendar,day_name,object,Day name
3,dim_calendar,day_of_month,int32,Day of month
4,dim_calendar,is_weekend,bool,True if Saturday/Sunday
5,dim_calendar,month,int32,Month number (1–12)
6,dim_calendar,month_name,object,Full month name
7,dim_calendar,quarter,int32,Quarter number (1–4)
8,dim_calendar,week_of_year,int64,ISO week number
9,dim_calendar,year,int32,Calendar year


3.8: Quick sanity checks

In [19]:
con = duckdb.connect(str(BASE / 'warehouse.duckdb'))

print("Row counts:")
for t in ['dim_customer','dim_product','dim_geography','dim_calendar','fact_orders']:
    cnt = con.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0]
    print(f"  {t}: {cnt:,}")

print("\nSample fact joined to dims:")
q = """
SELECT f.order_id, f.sales, f.quantity, f.profit,
       dc.customer_name, dp.product_name, dg.city, dg.state, d.month_name
FROM fact_orders f
JOIN dim_customer dc  ON f.customer_sk = dc.customer_sk
JOIN dim_product  dp  ON f.product_sk  = dp.product_sk
JOIN dim_geography dg ON f.geography_sk = dg.geography_sk
JOIN dim_calendar d   ON f.order_date_sk = d.date_sk
LIMIT 5;
"""
con.execute(q).df()

Row counts:
  dim_customer: 1,590
  dim_product: 10,768
  dim_geography: 3,847
  dim_calendar: 1,461
  fact_orders: 51,290

Sample fact joined to dims:


Unnamed: 0,order_id,sales,quantity,profit,customer_name,product_name,city,state,month_name
0,AG-2011-2040,408.3,2,106.14,Toby Braunhardt,"Tenex Lockers, Blue",Constantine,Constantine,January
1,HU-2011-1220,66.12,4,29.64,Annie Thurman,"Tenex Box, Single Width",Budapest,Budapest,January
2,IN-2011-47883,120.366,3,36.036,Joseph Holt,"Acme Trimmer, High Speed",Wagga Wagga,New South Wales,January
3,IN-2011-47883,55.242,2,15.342,Joseph Holt,"Eaton Computer Printout Paper, 8.5 x 11",Wagga Wagga,New South Wales,January
4,IN-2011-47883,113.67,5,37.77,Joseph Holt,"Eldon Light Bulb, Duo Pack",Wagga Wagga,New South Wales,January
