# Silver to Gold Data Pipeline

This notebook processes bronze data through silver layer (data cleaning) to gold layer (aggregations and business logic).

In [None]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("SilverToGold").getOrCreate()

In [None]:
def filter_and_clean_bronze_data(bronze_df):
    """Filter và clean data"""
    cleaned_df = bronze_df.select(
        col("transaction_id"),
        to_date(col("date"), "yyyy-MM-dd").alias("transaction_date"),
        upper(trim(col("category"))).alias("category"),
        trim(col("product_name")).alias("product_name"),
        col("sales_amount"),
        col("quantity"),
        col("customer_id"),
        col("ingestion_timestamp")
    ).filter(
        # Filter out invalid records
        (col("sales_amount") > 0) &
        (col("quantity") > 0) &
        (col("date").isNotNull()) &
        (col("category").isNotNull()) &
        (col("product_name").isNotNull())
    )
    return cleaned_df

In [None]:
# Define the silver table - cleaned and standardized data
@dlt.table(
    name="silver_table",
    comment="Cleaned and standardized sales data"
)
@dlt.expect_or_fail("valid_sales_amount", "sales_amount > 0")
@dlt.expect_or_fail("valid_quantity", "quantity > 0")
@dlt.expect_or_fail("valid_date", "transaction_date IS NOT NULL")
@dlt.expect_or_fail("valid_category", "category IS NOT NULL")
@dlt.expect_or_fail("valid_product", "product_name IS NOT NULL")
def silver_table():
    # Read from bronze table
    bronze_df = dlt.read("bronze_table")
    
    cleaned_df = filter_and_clean_bronze_data(bronze_df)
    
    # Add calculated fields
    silver_df = cleaned_df.withColumn(
        "total_amount", col("sales_amount") * col("quantity")
    ).withColumn(
        "processed_timestamp", current_timestamp()
    )
    
    return silver_df


In [None]:
# Define the gold table - aggregated business metrics
@dlt.table(
    name="gold_table",
    comment="Aggregated sales metrics by category"
)
def gold_table():
    # Read from silver table
    silver_df = dlt.read("silver_table")
    
    # Aggregate data by category
    gold_df = silver_df.groupBy("category").agg(
        count("transaction_id").alias("total_transactions"),
        sum("total_amount").alias("total_sales"),
        avg("total_amount").alias("avg_transaction_value"),
        sum("quantity").alias("total_quantity_sold"),
        countDistinct("customer_id").alias("unique_customers"),
        min("transaction_date").alias("first_transaction_date"),
        max("transaction_date").alias("last_transaction_date")
    ).withColumn(
        "calculation_timestamp", current_timestamp()
    ).withColumn(
        "avg_transaction_value", round(col("avg_transaction_value"), 2)
    ).withColumn(
        "total_sales", round(col("total_sales"), 2)
    )
    
    return gold_df

In [None]:
print("Silver to Gold pipeline setup complete.")