In [0]:
# Gold Modeling: Star Schema and Aggregations
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, avg, year, month, to_date, when
from delta.tables import DeltaTable
import pyspark.sql.utils  # For error handling

spark = SparkSession.builder.appName("GoldModel").getOrCreate()

# Set the catalog explicitly
spark.sql("USE CATALOG workspace")

# Debug: Check current catalog and schema
print("Current Catalog:")
display(spark.sql("SELECT current_catalog()"))
print("Current Schema:")
display(spark.sql("SELECT current_schema()"))

# Create the gold schema if it doesn't exist, with error handling
try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.gold_retail COMMENT 'Gold layer for modeled retail data'")
    print("Schema workspace.gold created or already exists.")
except pyspark.sql.utils.AnalysisException as e:
    print(f"Error creating schema: {e}")
    # If permissions issue, stop here and check with admin

In [0]:
#Load Silver from catalog tables
pos_sales = spark.read.table("workspace.silver.pos_sales")
customers = spark.read.table("workspace.silver.customers")
products = spark.read.table("workspace.silver.products")
promotions = spark.read.table("workspace.silver.promotions")
stores = spark.read.table("workspace.silver.stores")

# Fact Sales: Join and derive metrics
fact_sales = pos_sales.join(customers, "customer_id", "left") \
    .join(products, "product_id", "left") \
    .join(promotions, "promotion_id", "left") \
    .join(stores, "store_id", "left") \
    .withColumn("discount_amount", when(col("discount_percent").isNotNull(), col("total_amount") * (col("discount_percent") / 100)).otherwise(0)) \
    .withColumn("net_sales", col("total_amount") - col("discount_amount")) \
    .select(
        col("transaction_id"), col("transaction_date"), col("product_id"), col("customer_id"), col("store_id"), col("promotion_id"),
        col("quantity"), col("unit_price"), col("total_amount"), col("discount_amount"), col("net_sales"),
        col("loyalty_status"), col("category"), col("brand"), col("promotion_type"), col("store_name")
    )

# Dimension Tables (simplified)
dim_customers = customers.select("customer_id", "first_name", "last_name", "email", "loyalty_status", "signup_date")
dim_products = products.select("product_id", "product_name", "category", "brand", "base_price")
dim_stores = stores.select("store_id", "store_name", "location", "store_type", "opened_date")
dim_promotions = promotions.select("promotion_id", "promotion_type", "discount_percent", "start_date", "end_date")
dim_date = fact_sales.select(to_date("transaction_date").alias("date")).distinct() \
    .withColumn("year", year("date")).withColumn("month", month("date"))



In [0]:
# Aggregated View: Sales by Category and Loyalty
agg_sales = fact_sales.groupBy("category", "loyalty_status", year("transaction_date").alias("year"), month("transaction_date").alias("month")) \
    .agg(sum("net_sales").alias("total_net_sales"), sum("quantity").alias("total_quantity"), avg("discount_amount").alias("avg_discount"))

# Write to Gold with createIfNotExists
DeltaTable.createIfNotExists(spark).tableName("workspace.gold_retail.fact_sales").addColumns(fact_sales.schema).partitionedBy("transaction_date").execute()
fact_sales.write.format("delta").mode("overwrite").partitionBy("transaction_date").saveAsTable("workspace.gold_retail.fact_sales")

DeltaTable.createIfNotExists(spark).tableName("workspace.gold_retail.agg_sales_by_category_loyalty").addColumns(agg_sales.schema).execute()
agg_sales.write.format("delta").mode("overwrite").saveAsTable("workspace.gold_retail.agg_sales_by_category_loyalty")

DeltaTable.createIfNotExists(spark).tableName("workspace.gold_retail.dim_customers").addColumns(dim_customers.schema).execute()
dim_customers.write.format("delta").mode("overwrite").saveAsTable("workspace.gold_retail.dim_customers")

DeltaTable.createIfNotExists(spark).tableName("workspace.gold_retail.dim_products").addColumns(dim_products.schema).execute()
dim_products.write.format("delta").mode("overwrite").saveAsTable("workspace.gold_retail.dim_products")

DeltaTable.createIfNotExists(spark).tableName("workspace.gold_retail.dim_stores").addColumns(dim_stores.schema).execute()
dim_stores.write.format("delta").mode("overwrite").saveAsTable("workspace.gold_retail.dim_stores")

DeltaTable.createIfNotExists(spark).tableName("workspace.gold_retail.dim_promotions").addColumns(dim_promotions.schema).execute()
dim_promotions.write.format("delta").mode("overwrite").saveAsTable("workspace.gold_retail.dim_promotions")

DeltaTable.createIfNotExists(spark).tableName("workspace.gold_retail.dim_date").addColumns(dim_date.schema).execute()
dim_date.write.format("delta").mode("overwrite").saveAsTable("workspace.gold_retail.dim_date")

# Optimize key tables
# - Z-Ordering: Clusters data by common join keys (e.g., customer_id) to reduce scan times.
# - Compaction: Merges small files for better read efficiency.
# Corrected method: Use executeZOrderBy instead of executeZOrdering
DeltaTable.forName(spark, "workspace.gold_retail.fact_sales").optimize().executeZOrderBy("customer_id")
DeltaTable.forName(spark, "workspace.gold_retail.agg_sales_by_category_loyalty").optimize().executeCompaction()

print("Gold tables created successfully.")