In [0]:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import *

# Define the schema
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("price", StringType(), True),
    StructField("order_timestamp", TimestampType(), True),
    StructField("status", StringType(), True)
])


In [0]:
# Databricks Notebook: bronze_orders_load
# Language: Python
from pyspark.sql.functions import current_timestamp, lit, rand, round, col, input_file_name, to_timestamp, upper
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType


# COMMAND ----------
# DBTITLE 1,Configuration
raw_data_landing_path = "abfss://rawdata@adlsexternalfororders.dfs.core.windows.net/orders/"
bronze_table_name = "ordercatalog.bronze_schema.bronze_orders_raw"
# Checkpoint locations for Structured Streaming 
bronze_checkpoint_path = f"{raw_data_path}_checkpoint/bronze_orders/"
silver_checkpoint_path = f"{raw_data_path}_checkpoint/silver_orders/"

print(f"Starting Bronze stream to load into {bronze_table_name}...")

bronze_stream_df = spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", f"{bronze_checkpoint_path}/schema") \
  .load(raw_data_landing_path) \
  .withColumn("ingestion_timestamp", current_timestamp()) \
  .withColumn("source_file", input_file_name())

# Write the stream to the bronze Delta table in append mode
bronze_query = bronze_stream_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", f"{bronze_checkpoint_path}/data") \
  .option("mergeSchema", "true") \
  .trigger(availableNow=True).toTable(bronze_table_name)

# Wait for the stream to complete the current micro-batch (important for 'availableNow')
bronze_query.awaitTermination()
print(f"Bronze stream finished processing. Total records in Bronze: {spark.read.table(bronze_table_name).count()}")



In [0]:

silver_table_name = "ordercatalog.silver_schema.silver_orders_processed"
print(f"Starting Silver stream to process data from Bronze and upsert into {silver_table_name}...")
from delta.tables import DeltaTable

def upsert_to_silver(microBatchOutputDF, batchId):
    # Data Cleaning and Transformation for the current micro-batch
    df_silver_batch = microBatchOutputDF.select(
        col("order_id").cast(StringType()).alias("order_id"),
        col("customer_id").cast(StringType()).alias("customer_id"),
        col("product_id").cast(StringType()).alias("product_id"),
        col("quantity").cast(IntegerType()).alias("quantity"),
        col("price").cast(DoubleType()).alias("price"),
        # Ensure order_timestamp is correctly cast if it might be string in raw
        col("order_timestamp").cast(TimestampType()).alias("order_timestamp"),
        upper(col("status")).alias("order_status"), # Standardize status to uppercase
        current_timestamp().alias("processing_timestamp") # Add silver layer processing timestamp
    )

    # Optional: Deduplicate the micro-batch itself before merging
    # This prevents issues if a single bronze micro-batch contains multiple updates for the same order_id
    df_silver_batch_deduped = df_silver_batch.orderBy(col("order_id"), col("order_timestamp").desc_nulls_last()) \
                                             .dropDuplicates(["order_id"])

    # Perform MERGE (UPSERT) operation
    # The merge key is 'order_id' as it uniquely identifies an order.
    merge_key = "order_id"

    # Check if silver table exists
    if not spark.catalog.tableExists(silver_table_name):
        # Create the silver table if it doesn't exist (initial run)
        df_silver_batch_deduped.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .partitionBy("order_status").saveAsTable(silver_table_name)
        print(f"Batch {batchId}: Created silver table {silver_table_name} and loaded initial data.")
    else:
        # If table exists, perform a MERGE
        deltaTable = DeltaTable.forName(spark, silver_table_name)

        deltaTable.alias("target") \
            .merge(
                df_silver_batch_deduped.alias("source"),
                f"target.{merge_key} = source.{merge_key}"
            ) \
            .whenMatchedUpdate(set = { # Update existing records
                "customer_id": "source.customer_id",
                "product_id": "source.product_id",
                "quantity": "source.quantity",
                "price": "source.price",
                "order_timestamp": "source.order_timestamp",
                "order_status": "source.order_status",
                "processing_timestamp": "source.processing_timestamp"
            }) \
            .whenNotMatchedInsertAll() \
            .execute()
        print(f"Batch {batchId}: Performed upsert on silver table {silver_table_name}.")

# Read from bronze as a stream
silver_stream_df = spark.readStream \
  .format("delta") \
  .table(bronze_table_name)

# Write the stream using foreachBatch for MERGE INTO
silver_query = silver_stream_df.writeStream \
  .foreachBatch(upsert_to_silver) \
  .outputMode("update").option("checkpointLocation", f"{silver_checkpoint_path}/data").trigger(availableNow=True).start()

silver_query.awaitTermination()
print(f"Silver stream finished processing. Total records in Silver: {spark.read.table(silver_table_name).count()}")

# COMMAND ----------
# DBTITLE 2,4. Gold Layer: Aggregate Results (Batch Processing)
# The gold layer aggregates the latest state of data from the silver layer
# and is typically refreshed on a schedule (e.g., daily, hourly).

In [0]:
gold_table_name = "ordercatalog.gold_schema.gold_orders_daily_summary"
print(f"Aggregating data for Gold layer from {silver_table_name} into {gold_table_name}...")

from pyspark.sql.functions import sum, count, avg, date_trunc, col, round

# Read the current state of the silver table
df_silver_current = spark.read.table(silver_table_name)

# Perform Aggregations
df_gold = df_silver_current.groupBy(
    date_trunc("day", col("order_timestamp")).alias("order_date"),
    col("order_status")
).agg(
    count(col("order_id")).alias("total_orders"),
    sum(col("quantity")).alias("total_quantity"),
    sum(col("quantity") * col("price")).alias("total_revenue"),
    round(avg(col("price")), 2).alias("average_price_per_item") # Rounded to 2 decimal places
).orderBy("order_date", "order_status")

# Write to Gold Delta Table (overwrite for daily/periodic summary)
# Overwriting ensures idempotency and reflects the latest state from Silver.
df_gold.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true").partitionBy("order_date").saveAsTable(gold_table_name)

print(f"Gold layer aggregation complete. Total records in Gold: {spark.read.table(gold_table_name).count()}")

# COMMAND ----------
# DBTITLE 2,5. Verification and Exploration
# Verify data in each layer
print("\n--- Bronze Layer Sample ---")
spark.sql(f"SELECT * FROM {bronze_table_name} LIMIT 10").display()
print(f"Bronze Count: {spark.read.table(bronze_table_name).count()}")

print("\n--- Silver Layer Sample ---")
spark.sql(f"SELECT * FROM {silver_table_name} LIMIT 10").display()
print(f"Silver Count: {spark.read.table(silver_table_name).count()}")

print("\n--- Gold Layer Sample ---")
spark.sql(f"SELECT * FROM {gold_table_name} ORDER BY order_date DESC, order_status LIMIT 10").display()
print(f"Gold Count: {spark.read.table(gold_table_name).count()}")