##Silver Layer â€“ Data Cleaning & Standardization

###Reading from bronze delta table

In [0]:
bronze_df = spark.read.format("delta").load("/Volumes/amazon_sales/bronze/streaming/Data/")
bronze_df.display()

###Data Transformation

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType

# casting data types for silver table

silver_df = bronze_df \
    .withColumn("order_date",    F.to_date("order_date",    "yyyy-MM-dd")) \
    .withColumn("ship_date",     F.to_date("ship_date",     "yyyy-MM-dd")) \
    .withColumn("delivery_date", F.to_date("delivery_date", "yyyy-MM-dd")) \
    .withColumn("quantity",      F.col("quantity").cast(IntegerType())) \
    .withColumn("unit_price",    F.col("unit_price").cast(DoubleType())) \
    .withColumn("discount",      F.col("discount").cast(DoubleType())) \
    .withColumn("shipping_cost", F.col("shipping_cost").cast(DoubleType())) \
    .withColumn("total_sales",   F.col("total_sales").cast(DoubleType()))

In [0]:
#Trim whitespace and fixing casing on all text columns

silver_df = silver_df \
    .withColumn("customer_name",  F.initcap(F.trim(F.col("customer_name")))) \
    .withColumn("product_name",   F.trim(F.col("product_name"))) \
    .withColumn("brand",          F.trim(F.col("brand"))) \
    .withColumn("category",       F.initcap(F.trim(F.col("category")))) \
    .withColumn("sub_category",   F.initcap(F.trim(F.col("sub_category")))) \
    .withColumn("order_status",   F.initcap(F.trim(F.col("order_status")))) \
    .withColumn("payment_method", F.initcap(F.trim(F.col("payment_method")))) \
    .withColumn("city",           F.initcap(F.trim(F.col("city")))) \
    .withColumn("state",          F.initcap(F.trim(F.col("state")))) \
    .withColumn("country",        F.initcap(F.trim(F.col("country"))))

In [0]:
#adding new columns that calculate delivery days and ship days
silver_df = silver_df \
    .withColumn("delivery_days",
        F.datediff(F.col("delivery_date"), F.col("order_date"))) \
    .withColumn("ship_days",
        F.datediff(F.col("ship_date"), F.col("order_date"))) \
    .withColumn("total_sales_recalc",
        F.round(F.col("quantity") * F.col("unit_price") * (F.lit(1) - F.col("discount")), 2))

In [0]:
#droping dluplicates in order_id column

silver_df = silver_df \
    .dropDuplicates(["order_id"])

In [0]:
#adding silver timestamp and write to table

silver_df = silver_df \
    .withColumn("_silver_timestamp", F.current_timestamp())

silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("amazon_sales.silver.amazon_sales_table")