# 02_transform_silver

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, year, month, expr

# 1) Start Spark

In [None]:
spark = SparkSession.builder.appName("SilverTransform").getOrCreate()

# 2) Read the bronze table

In [None]:
bronze_path = "dbfs:/FileStore/bronze/online_retail"
bronze_df = spark.read.format("delta").load(bronze_path)

# 3) Clean & enrich

In [None]:
silver_df = (
    bronze_df
        # a) Remove returns & bad records
        .filter(col("Quantity") > 0)
        .filter(col("CustomerID").isNotNull())

        # b) Convert InvoiceDate string → timestamp
        .withColumn("InvoiceTimestamp", to_timestamp(col("InvoiceDate"), "yyyy-MM-dd HH:mm:ss"))

        # c) Derive date hierarchy & sales amount
        .withColumn("InvoiceDateOnly", expr("date(InvoiceTimestamp)"))
        .withColumn("InvoiceYear", year(col("InvoiceTimestamp")))
        .withColumn("InvoiceMonth", month(col("InvoiceTimestamp")))
        .withColumn("SalesAmount", col("Quantity") * col("UnitPrice"))
)

# 4) Sanity-check the transformed DataFrame

In [None]:
print("Silver schema:")
silver_df.printSchema()
print(f"Silver-row count: {silver_df.count()}")
silver_df.select(
    "InvoiceNo", "InvoiceTimestamp", "Quantity", "UnitPrice", "SalesAmount",
    "CustomerID", "Country", "InvoiceYear", "InvoiceMonth"
).show(5, truncate=False)

# 5) Write out as Delta (silver layer), partitioned by Year & Month


In [None]:
silver_path = "dbfs:/FileStore/silver/online_retail"
(
    silver_df
      .write
      .format("delta")
      .mode("overwrite")
      .partitionBy("InvoiceYear", "InvoiceMonth")
      .save(silver_path)
)

# 6) Validate the write

In [None]:
valid_df = spark.read.format("delta").load(silver_path)
print(f"Silver Delta files at {silver_path} contain {valid_df.count()} rows")