In [0]:
import logging
import os
import sys
from datetime import datetime

# Ensure logs folder exists in repo
os.makedirs("logs", exist_ok=True)

# Timestamped log file name
run_ts = datetime.now().strftime("%Y%m%d_%H%M")
log_path = f"logs/run_{run_ts}.log"

# Configure logging (console + file)
logger = logging.getLogger("etl_logger")
logger.setLevel(logging.INFO)
logger.handlers.clear()  

fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")


ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)


fh = logging.FileHandler(log_path)
fh.setLevel(logging.INFO)
fh.setFormatter(fmt)

logger.addHandler(ch)
logger.addHandler(fh)

logger.info("========== ETL RUN START ==========")
logger.info(f"Log file: {log_path}")
logger.info(f"Python: {sys.version.split()[0]}")


In [0]:
try:
    import pyspark
    logger.info(f"PySpark version: {pyspark.__version__}")
except Exception as e:
    logger.info(f"PySpark not available (likely local Jupyter). Details: {e}")


try:
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    logger.info(f"Workspace: {ctx.workspaceId().get()}")
    logger.info(f"Cluster ID: {ctx.clusterId().get()}")
except Exception:
    logger.info("Databricks context not available (ok if running outside Databricks).")


In [0]:
import glob
sorted(glob.glob("logs/run_*.log"))[-3:]

In [0]:
import os
import random
import numpy as np

os.environ["PYTHONHASHSEED"] = "0"
random.seed(0)
np.random.seed(0)

logger.info("Reproducibility: seeds set (PYTHONHASHSEED=0, random=0, numpy=0)")

In [0]:
%pip freeze > requirements.txt

In [0]:
import hashlib
import json
from pathlib import Path

def sha256_file(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

files = ["data/menu_items.csv", "data/order_details.csv"]
hashes = {}

for fp in files:
    p = Path(fp)
    if not p.exists():
        raise FileNotFoundError(f"Missing required file: {fp}")
    hashes[p.name] = sha256_file(fp)

with open("data_hashes.json", "w") as f:
    json.dump(hashes, f, indent=2)

logger.info("Wrote data_hashes.json with SHA-256 hashes:")
logger.info(hashes)


In [0]:
import os
print("requirements.txt exists:", os.path.exists("requirements.txt"))
print("data_hashes.json exists:", os.path.exists("data_hashes.json"))
print("logs folder exists:", os.path.exists("logs"))

In [0]:
import pandas as pd
from datetime import datetime
import os

run_ts = datetime.now().strftime("%Y%m%d_%H%M")
output_dir = "/FileStore/tables/etl_output"
os.makedirs(output_dir, exist_ok=True)

logger.info(f"ETL output directory: {output_dir}")

In [0]:
def load_data(menu_path="data/menu_items.csv", orders_path="data/order_details.csv"):
    logger.info(f"Loading menu from {menu_path}")
    logger.info(f"Loading orders from {orders_path}")
    menu = pd.read_csv(menu_path)
    orders = pd.read_csv(orders_path)
    logger.info(f"Loaded menu rows: {len(menu)} | orders rows: {len(orders)}")
    return menu, orders

menu_df, orders_df = load_data()


In [0]:
def clean_data(menu: pd.DataFrame, orders: pd.DataFrame):
    # Normalize column names
    menu = menu.copy()
    orders = orders.copy()
    menu.columns = menu.columns.str.strip().str.lower()
    orders.columns = orders.columns.str.strip().str.lower()

    
    for col in menu.select_dtypes(include="object").columns:
        menu[col] = menu[col].astype(str).str.strip()

    for col in orders.select_dtypes(include="object").columns:
        orders[col] = orders[col].astype(str).str.strip()

    
    if "price" in menu.columns:
        menu["price"] = pd.to_numeric(menu["price"], errors="coerce")

    if "order_date" in orders.columns:
        orders["order_date"] = pd.to_datetime(orders["order_date"], errors="coerce")

    if "order_time" in orders.columns:
        # Keep as string or parse to time; we’ll use it if needed
        orders["order_time"] = orders["order_time"].astype(str).str.strip()

    return menu, orders

menu_df, orders_df = clean_data(menu_df, orders_df)
logger.info("Data cleaning complete")


In [0]:
def join_data(menu: pd.DataFrame, orders: pd.DataFrame):
   
    assert "menu_item_id" in menu.columns, "menu_items missing menu_item_id"
    assert "item_id" in orders.columns, "order_details missing item_id"

    joined = orders.merge(
        menu,
        left_on="item_id",
        right_on="menu_item_id",
        how="inner"
    )

    logger.info(f"Joined rows: {len(joined)}")
    return joined

joined_df = join_data(menu_df, orders_df)


In [0]:
def make_tidy(df: pd.DataFrame):
    
    keep_cols = []
    for c in ["order_id", "order_date", "order_time", "item_name", "category", "price", "quantity"]:
        if c in df.columns:
            keep_cols.append(c)

    tidy = df[keep_cols].copy()

   
    if "order_date" in tidy.columns and "order_time" in tidy.columns:
        tidy["order_datetime"] = pd.to_datetime(
            tidy["order_date"].astype(str) + " " + tidy["order_time"].astype(str),
            errors="coerce"
        )
    elif "order_date" in tidy.columns:
        tidy["order_datetime"] = pd.to_datetime(tidy["order_date"], errors="coerce")

    return tidy

tidy_df = make_tidy(joined_df)
logger.info(f"Tidy columns: {list(tidy_df.columns)}")
logger.info(f"Tidy rows: {len(tidy_df)}")


In [0]:
def compute_metrics(tidy: pd.DataFrame):
   
    if "quantity" not in tidy.columns:
        tidy = tidy.copy()
        tidy["quantity"] = 1

    
    if "price" in tidy.columns:
        tidy = tidy.copy()
        tidy["revenue"] = tidy["quantity"] * tidy["price"]
    else:
        tidy["revenue"] = None

    top_5_items = (
        tidy.groupby("item_name")["quantity"]
        .sum()
        .sort_values(ascending=False)
        .head(5)
        .reset_index()
        .rename(columns={"quantity": "total_quantity"})
    )

    revenue_by_category = None
    if "category" in tidy.columns and tidy["revenue"].notna().any():
        revenue_by_category = (
            tidy.groupby("category")["revenue"]
            .sum()
            .sort_values(ascending=False)
            .reset_index()
        )

    busiest_hour = None
    if "order_datetime" in tidy.columns:
        hourly = tidy.dropna(subset=["order_datetime"]).copy()
        hourly["hour"] = hourly["order_datetime"].dt.hour
        busiest_hour = (
            hourly.groupby("hour")
            .size()
            .reset_index(name="order_count")
            .sort_values("order_count", ascending=False)
            .head(1)
        )

    return top_5_items, revenue_by_category, busiest_hour

top_5_items_df, revenue_by_category_df, busiest_hour_df = compute_metrics(tidy_df)

logger.info("Computed metrics")
logger.info(f"Top 5 items rows: {len(top_5_items_df)}")
if revenue_by_category_df is not None:
    logger.info(f"Revenue by category rows: {len(revenue_by_category_df)}")
if busiest_hour_df is not None:
    logger.info(f"Busiest hour rows: {len(busiest_hour_df)}")


In [0]:
metrics_file = f"{output_dir}/metrics_{run_ts}.csv"
tidy_file = f"{output_dir}/tidy_{run_ts}.csv"

# Save tidy output
tidy_df.to_csv(tidy_file, index=False)


with open(metrics_file, "w", encoding="utf-8") as f:
    f.write("TOP_5_ITEMS\n")
    top_5_items_df.to_csv(f, index=False)
    f.write("\nREVENUE_BY_CATEGORY\n")
    if revenue_by_category_df is not None:
        revenue_by_category_df.to_csv(f, index=False)
    f.write("\nBUSIEST_HOUR\n")
    if busiest_hour_df is not None:
        busiest_hour_df.to_csv(f, index=False)

logger.info(f"Saved tidy output: {tidy_file}")
logger.info(f"Saved metrics output: {metrics_file}")


In [0]:
metrics_file = f"{output_dir}/metrics_{run_ts}.csv"
tidy_file = f"{output_dir}/tidy_{run_ts}.csv"

# Save tidy output
tidy_df.to_csv(tidy_file, index=False)

# Save metrics (combine into one CSV with sections)
with open(metrics_file, "w", encoding="utf-8") as f:
    f.write("TOP_5_ITEMS\n")
    top_5_items_df.to_csv(f, index=False)
    f.write("\nREVENUE_BY_CATEGORY\n")
    if revenue_by_category_df is not None:
        revenue_by_category_df.to_csv(f, index=False)
    f.write("\nBUSIEST_HOUR\n")
    if busiest_hour_df is not None:
        busiest_hour_df.to_csv(f, index=False)

logger.info(f"Saved tidy output: {tidy_file}")
logger.info(f"Saved metrics output: {metrics_file}")


In [0]:
# Basic ETL validation tests
assert not tidy_df.empty, "Tidy output is empty"
assert "order_id" in tidy_df.columns, "Missing order_id in tidy output"
assert "item_name" in tidy_df.columns, "Missing item_name in tidy output"
assert "price" in tidy_df.columns, "Missing price in tidy output"

assert not top_5_items_df.empty, "Top 5 items metric is empty"
assert "item_name" in top_5_items_df.columns, "Top 5 missing item_name"
assert "total_quantity" in top_5_items_df.columns, "Top 5 missing total_quantity"

logger.info("All Part D assertions passed ✅")
logger.info("========== ETL RUN END ==========")


In [0]:
output_dir = "dbfs:/FileStore/tables/etl_output"
dbutils.fs.mkdirs(output_dir)

logger.info(f"Ensured DBFS output dir exists: {output_dir}")

In [0]:
import os

output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)

logger.info(f"Output directory set to: {output_dir}")


In [0]:
metrics_file = f"{output_dir}/metrics_{run_ts}.csv"
tidy_file = f"{output_dir}/tidy_{run_ts}.csv"

tidy_df.to_csv(tidy_file, index=False)

with open(metrics_file, "w", encoding="utf-8") as f:
    f.write("TOP_5_ITEMS\n")
    top_5_items_df.to_csv(f, index=False)
    f.write("\nREVENUE_BY_CATEGORY\n")
    if revenue_by_category_df is not None:
        revenue_by_category_df.to_csv(f, index=False)
    f.write("\nBUSIEST_HOUR\n")
    if busiest_hour_df is not None:
        busiest_hour_df.to_csv(f, index=False)

logger.info(f"Saved tidy output: {tidy_file}")
logger.info(f"Saved metrics output: {metrics_file}")


In [0]:
import glob
glob.glob("outputs/*.csv")

In [0]:
logger.info("Top 5 items preview:")
logger.info(top_5_items_df.head().to_string(index=False))