In [0]:
# -------------------------------
# Logging setup (self-contained)
# -------------------------------

from pyspark.sql.types import *
from pyspark.sql.functions import current_timestamp
import uuid

LOG_PATH = "/FileStore/project/logs/pipeline_logs"

log_schema = StructType([
    StructField("run_id", StringType()),
    StructField("pipeline_layer", StringType()),
    StructField("notebook_name", StringType()),
    StructField("event_type", StringType()),   # START / END / ERROR / REJECTED
    StructField("record_count", LongType()),
    StructField("status", StringType()),       # RUNNING / SUCCESS / FAILED
    StructField("error_message", StringType()),
    StructField("event_timestamp", TimestampType())
])

# Create logging table if it does not exist
if not spark._jsparkSession.catalog().tableExists("delta.`/FileStore/project/logs/pipeline_logs`"):
    spark.createDataFrame([], log_schema) \
        .write.format("delta") \
        .mode("overwrite") \
        .save(LOG_PATH)

def log_event(layer, notebook, event_type, record_count, status, error_msg=None):

    row = [(
        str(uuid.uuid4()),
        layer,
        notebook,
        event_type,
        int(record_count),
        status,
        error_msg,          # can be None safely now
        None                # placeholder for timestamp
    )]

    df = spark.createDataFrame(row, schema=log_schema) \
              .withColumn("event_timestamp", current_timestamp())

    df.write.format("delta").mode("append").save(LOG_PATH)



In [0]:
# Task 6.1: Initialize Gold layer processing

notebook_name = "gold_aggregation"
log_event("gold", notebook_name, "START", 0, "RUNNING")

# Source (Silver)
silver_sales_path = "/FileStore/project/silver/sales"

sales = spark.read.format("delta").load(silver_sales_path)


In [0]:
# Task 6.2: Ensure Gold schema exists in hive_metastore

spark.sql("""
CREATE SCHEMA IF NOT EXISTS hive_metastore.project_gold
""")


DataFrame[]

In [0]:
# Task 6.3: Create Daily Sales Summary

gold_daily_sales = (
    sales
    .groupBy("transaction_date")
    .sum("correct_total")
    .withColumnRenamed("sum(correct_total)", "daily_revenue")
)

gold_daily_sales.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("hive_metastore.project_gold.daily_sales")


In [0]:
# Task 6.4: Create Monthly Revenue by Region

from pyspark.sql.functions import month

stores = spark.read.option("header", True).csv(
    "/FileStore/project/source/stores.csv"
)

gold_monthly_region = (
    sales.join(stores, "store_id")
    .withColumn("month", month("transaction_date"))
    .groupBy("month", "region")
    .sum("correct_total")
    .withColumnRenamed("sum(correct_total)", "monthly_revenue")
)

gold_monthly_region.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("hive_metastore.project_gold.monthly_region")


In [0]:
# Task 6.5: Create Product Performance Metrics

products = spark.read.option("header", True).csv(
    "/FileStore/project/source/products.csv"
)

gold_product_performance = (
    sales.join(products, "product_id")
    .groupBy("product_name", "category")
    .sum("correct_total")
    .withColumnRenamed("sum(correct_total)", "total_revenue")
)

gold_product_performance.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("hive_metastore.project_gold.product_performance")


In [0]:
# Task 6.6: Log successful Gold completion

log_event(
    layer="gold",
    notebook=notebook_name,
    event_type="END",
    record_count=gold_daily_sales.count(),
    status="SUCCESS"
)
