In [0]:
import logging
from datetime import datetime
import os

# Create logs folder if it doesn't exist
os.makedirs("logs", exist_ok=True)

# Generate timestamped log filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
log_file = f"logs/run_{timestamp}.log"

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    handlers=[
        logging.StreamHandler(),            # Console output
        logging.FileHandler(log_file)       # File output
    ]
)

# Example logging
logging.info("ETL run started")
logging.info("Cluster/runtime info: Databricks cluster XYZ")  # Replace with actual info if available
logging.info("Configuration values: CSV paths, output paths, etc.")


In [0]:
import os, random, numpy as np
import hashlib
import json

# 1. Fix random seeds
os.environ['PYTHONHASHSEED'] = '0'
random.seed(0)
np.random.seed(0)
logging.info("Random seeds fixed")

# 2. Capture environment
!pip freeze > requirements.txt
logging.info("Environment captured in requirements.txt")

# 3. Compute SHA-256 hashes for CSV files
def hash_file(file_path):
    with open(file_path, "rb") as f:
        return hashlib.sha256(f.read()).hexdigest()

data_hashes = {
    "menu_items.csv": hash_file("data/Restaurant Orders/menu_items.csv"),
    "order_details.csv": hash_file("data/Restaurant Orders/order_details.csv")
}

with open("data_hashes.json", "w") as f:
    json.dump(data_hashes, f, indent=4)

logging.info("SHA-256 hashes computed and saved to data_hashes.json")


In [0]:
import pandas as pd

# 1. Load CSVs
menu = pd.read_csv("data/Restaurant Orders/menu_items.csv")
orders = pd.read_csv("data/Restaurant Orders/order_details.csv")
logging.info("CSV files loaded into DataFrames")

# 2. Basic cleaning
menu['category'] = menu['category'].str.strip().str.lower()
orders['order_date'] = pd.to_datetime(orders['order_date'])
orders['order_time'] = pd.to_datetime(orders['order_time'])
logging.info("Data cleaned: categories standardized, dates converted")

# 3. Merge tables
combined = orders.merge(menu, left_on='item_id', right_on='menu_item_id')
logging.info("Tables merged on item_id")

# 4. Select useful columns
tidy = combined[['order_id', 'order_date', 'order_time', 'item_name', 'category', 'price']]

# 5. Compute metrics
# Top 5 items by quantity
top_items = tidy['item_name'].value_counts().head(5)
logging.info(f"Top 5 items: {top_items.to_dict()}")

# Revenue by category
revenue_by_category = tidy.groupby('category')['price'].sum().sort_values(ascending=False)
logging.info(f"Revenue by category: {revenue_by_category.to_dict()}")

# Busiest hour of day
tidy['hour'] = tidy['order_time'].dt.hour
orders_by_hour = tidy.groupby('hour').size().sort_values(ascending=False)
logging.info(f"Busiest hour(s): {orders_by_hour.to_dict()}")

# 6. Save results
output_file = f"/FileStore/tables/etl_output/metrics_{timestamp}.csv"

logging.info(f"Metrics saved to {output_file}")

# 7. Assert tests
assert not tidy.empty, "ETL output is empty"
expected_columns = ['order_id', 'order_date', 'order_time', 'item_name', 'category', 'price', 'hour']
for col in expected_columns:
    assert col in tidy.columns, f"Missing column: {col}"
logging.info("ETL assertions passed")


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read CSVs
menu_spark = spark.read.csv("data/Restaurant Orders/menu_items.csv", header=True, inferSchema=True)
orders_spark = spark.read.csv("data/Restaurant Orders/order_details.csv", header=True, inferSchema=True)

# Example metric: top item by quantity
from pyspark.sql.functions import col, count
top_item_spark = orders_spark.groupBy("item_id").agg(count("*").alias("quantity")).orderBy(col("quantity").desc())

logging.info("PySpark metric computed")
