In [0]:
# Databricks Notebook Example: Silver and Gold Data Processing

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, min, max, to_timestamp

# Initialize Spark Session
spark = SparkSession.builder.appName("SilverGoldPipeline").getOrCreate()

In [0]:
# Create the Bronze schema if it doesn't exist
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")

Out[6]: DataFrame[]

In [0]:
# Step 1: Create Bronze Table (Raw Data)
bronze_data = [
    ("TXN123", "CUST001", "2024-02-10T12:34:56Z", "100.5", "USD", "Completed"),
    ("TXN124", "CUST002", "2024-02-11T14:20:10Z", "250.0", "USD", "Pending"),
    ("TXN125", "CUST001", "2024-02-12T16:45:30Z", "300.0", "USD", "Completed"),
]

bronze_columns = [
    "transaction_id",
    "customer_id",
    "transaction_date",
    "amount",
    "currency",
    "payment_status",
]
bronze_df = spark.createDataFrame(bronze_data, bronze_columns)

# Write to Delta Bronze Table
bronze_df.write.format("delta").mode("overwrite").saveAsTable("bronze.transactions")

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

Out[8]: DataFrame[]

In [0]:
# Step 2: Create Silver Table (Cleaned & Structured Data)
silver_df = spark.sql(
    """
    SELECT
        transaction_id,
        customer_id,
        to_timestamp(transaction_date, 'yyyy-MM-dd\\'T\\'HH:mm:ssX') AS transaction_timestamp,
        CAST(amount AS DECIMAL(10,2)) AS amount,
        currency,
        payment_status
    FROM bronze.transactions
    WHERE payment_status = 'Completed'
    """
)

# Write to Delta Silver Table
silver_df.write.format("delta").mode("overwrite").saveAsTable("silver.transactions")

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

Out[10]: DataFrame[]

In [0]:
# Step 3: Create Gold Table (Aggregated Data for Reporting)
gold_df = silver_df.groupBy("customer_id").agg(
    sum("amount").alias("total_spent"),
    count("transaction_id").alias("transaction_count"),
    min("transaction_timestamp").alias("first_purchase"),
    max("transaction_timestamp").alias("last_purchase"),
)

# Write to Delta Gold Table
gold_df.write.format("delta").mode("overwrite").saveAsTable("gold.customer_spending")

In [0]:
# Display Gold Data
display(gold_df)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-3480141376382282>:2[0m
[1;32m      1[0m [38;5;66;03m# Display Gold Data[39;00m
[0;32m----> 2[0m [43mdisplay[49m[43m([49m[43mgold_df[49m[43m)[49m

File [0;32m/databricks/python_shell/dbruntime/display.py:83[0m, in [0;36mDisplay.display[0;34m(self, input, *args, **kwargs)[0m
[1;32m     80[0m         [38;5;28;01mif[39;00m kwargs[38;5;241m.[39mget([38;5;124m'[39m[38;5;124mtrigger[39m[38;5;124m'[39m):
[1;32m     81[0m             [38;5;28;01mraise[39;00m [38;5;167;01mException[39;00m([38;5;124m'[39m[38;5;124mTriggers can only be set for streaming queries.[39m[38;5;124m'[39m)
[0;32m---> 83[0m         [38;5;28;43mself[39;49m[38;5;241;43m.[39;49m[43madd_custom_display_data[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mtable[39;49m[38;5;124;43m"