# Load silver layer

This notebook performs the ETL (Extract, Transform, Load) process to 
    populate the 'silver' schema tables from the 'bronze' schema.

In [0]:
%python
import time

start_time = time.time()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#### Insert data into `sales_project.silver.crm_cust_info`

In [0]:
src = spark.table("sales_project.bronze.crm_cust_info")

w = Window.partitionBy("cst_id").orderBy(F.col("cst_create_date").desc())

df_silver = (
    src.where(F.col("cst_id").isNotNull())
    .select(
        F.expr("try_cast(cst_id as int)").alias("cst_id"),
        "cst_key",
        F.trim(F.col("cst_firstname")).alias("cst_firstname"),
        F.trim(F.col("cst_lastname")).alias("cst_lastname"),

        F.when(F.upper(F.trim(F.col("cst_gndr"))) == "M", "Male")
         .when(F.upper(F.trim(F.col("cst_gndr"))) == "F", "Female")
         .otherwise("n/a")
         .alias("cst_gndr"),

        F.when(F.upper(F.trim(F.col("cst_marital_status"))) == "M", "Married")
         .when(F.upper(F.trim(F.col("cst_marital_status"))) == "S", "Single")
         .otherwise("n/a")
         .alias("cst_marital_status"),

        "cst_create_date",
        F.current_timestamp().alias("dwh_create_date"),

        F.row_number().over(w).alias("row_number")
    )
    .where(F.col("row_number") == 1)
    .drop("row_number")
)

df_silver.write.mode("overwrite").saveAsTable("sales_project.silver.crm_cust_info")
print(f"Inserted rows: {df_silver.count()}")


Inserted rows: 18484


#### Insert data into `sales_project.silver.crm_prd_info`

In [0]:
src = spark.table("sales_project.bronze.crm_prd_info")

w = Window.partitionBy("prd_key").orderBy("prd_start_dt")

df_silver = (
    src.select(
        F.expr("try_cast(prd_id as int)").alias("prd_id"),
        
        F.regexp_replace(
            F.substring(F.col("prd_key"), 1, 5), "-", "_"
        ).alias("cat_id"),

        F.substring(F.col("prd_key"), 7, F.length(F.col("prd_key"))).alias("prd_key"),
        "prd_nm",
        F.coalesce(F.col("prd_cost"), F.lit(0)).cast("int").alias("prd_cost"),

        F.when(F.upper(F.trim(F.col("prd_line"))) == "R", "Road")
         .when(F.upper(F.trim(F.col("prd_line"))) == "T", "Touring")
         .when(F.upper(F.trim(F.col("prd_line"))) == "S", "Other Sales")
         .when(F.upper(F.trim(F.col("prd_line"))) == "M", "Mountain")
         .otherwise("n/a")
         .alias("prd_line"),

        "prd_start_dt",
        (F.lead("prd_start_dt").over(w) - F.expr("INTERVAL 1 DAY")).alias("prd_end_dt"),
        F.current_timestamp().alias("dwh_create_date")
    )
)

df_silver.write.mode("overwrite").saveAsTable("sales_project.silver.crm_prd_info")

print(f"Inserted rows: {df_silver.count()}")

Inserted rows: 397


#### Insert data into `sales_project.silver.crm_sales_details`

In [0]:
from pyspark.sql import functions as F

src = spark.table("sales_project.bronze.crm_sales_details")

df_silver = (
    src.select(
        "sls_ord_num",
        "sls_prd_key",
        F.expr("try_cast(sls_cust_id as int)").alias("sls_cust_id"),

        F.when(
            (F.col("sls_order_dt") < 0) | (F.length(F.col("sls_order_dt").cast("string")) != 8),
            F.lit(None)
        ).otherwise(
            F.to_date(F.col("sls_order_dt").cast("string"), "yyyyMMdd")
        ).alias("sls_order_dt"),

        F.when(
            (F.col("sls_ship_dt") < 0) | (F.length(F.col("sls_ship_dt").cast("string")) != 8),
            F.lit(None)
        ).otherwise(
            F.to_date(F.col("sls_ship_dt").cast("string"), "yyyyMMdd")
        ).alias("sls_ship_dt"),

        F.when(
            (F.col("sls_due_dt") < 0) | (F.length(F.col("sls_due_dt").cast("string")) != 8),
            F.lit(None)
        ).otherwise(
            F.to_date(F.col("sls_due_dt").cast("string"), "yyyyMMdd")
        ).alias("sls_due_dt"),

        F.when(
            (F.col("sls_sales").isNull())
            | (F.col("sls_sales") <= 0)
            | (F.col("sls_sales") != (F.col("sls_quantity") * F.abs(F.col("sls_price")))),
            (F.col("sls_quantity") * F.abs(F.col("sls_price")))
        ).otherwise(F.col("sls_sales"))
         .cast("int")
         .alias("sls_sales"),

        F.col("sls_quantity").cast("int").alias("sls_quantity"),

        F.when(
            (F.col("sls_price").isNull()) | (F.col("sls_price") <= 0),
            F.col("sls_sales") / F.expr("nullif(sls_quantity, 0)")
        ).otherwise(F.col("sls_price"))
         .cast("int")
         .alias("sls_price"),

        F.current_timestamp().alias("dwh_create_date")
    )
)

df_silver.write.mode("overwrite").saveAsTable("sales_project.silver.crm_sales_details")
print(f"Inserted rows: {df_silver.count()}")


Inserted rows: 60398


#### Insert data into `sales_project.silver.erp_cust_az_12`

In [0]:
src = spark.table("sales_project.bronze.erp_cust_az_12")

df_silver = (
    src.select(
        F.when(
            F.col("cid").like("NAS%"),
            F.substring(F.col("cid"), 4, F.length(F.col("cid")) - 3)
        ).otherwise(
            F.col("cid")
        ).alias("cid"),

        F.when(
            F.col("bdate").isNull(),
            F.lit(None)
        ).when(
            F.col("bdate") > F.current_date(),
            F.lit(None)
        ).otherwise(
            F.col("bdate")
        ).alias("bdate"),

        F.when(
            F.upper(F.trim(F.col("gen"))).isin("F", "FEMALE"),
            "Female"
        ).when(
            F.upper(F.trim(F.col("gen"))).isin("M", "MALE"),
            "Male"
        ).otherwise(
            "n/a"
        ).alias("gen"),

        F.current_timestamp().alias("dwh_create_date")
    )
)

df_silver.write.mode("overwrite").saveAsTable("sales_project.silver.erp_cust_az_12")

print(f"Inserted rows: {df_silver.count()}")

Inserted rows: 18484


#### Insert data into `sales_project.silver.erp_loc_a_101`

In [0]:
src = spark.table("sales_project.bronze.erp_loc_a_101")

df_silver = (
    src.select(
        F.regexp_replace(F.col("cid"), "-", "").alias("cid"),

        F.when(
            F.upper(F.trim(F.col("cntry"))).isin("USA", "UNITED STATES", "US"),
            "United States"
        ).when(
            F.upper(F.trim(F.col("cntry"))) == "DE",
            "Germany"
        ).when(
            F.col("cntry").isNull() | (F.length(F.trim(F.col("cntry"))) == 0),
            "n/a"
        ).otherwise(
            F.trim(F.col("cntry"))
        ).alias("cntry"),

        F.current_timestamp().alias("dwh_create_date")
    )
)

df_silver.write.mode("overwrite").saveAsTable("sales_project.silver.erp_loc_a_101")
print(f"Inserted rows: {df_silver.count()}")


Inserted rows: 18484


#### Insert data into `sales_project.silver.erp_px_cat_g_1_v_2`

In [0]:
src = spark.table("sales_project.bronze.erp_px_cat_g_1_v_2")

df_silver = (
    src.select(
        "id",
        "cat",
        "subcat",
        "maintenance",
        F.current_timestamp().alias("dwh_create_date")
    )
)

df_silver.write.mode("overwrite").saveAsTable("sales_project.silver.erp_px_cat_g_1_v_2")

print(f"Inserted rows: {df_silver.count()}")

Inserted rows: 37


In [0]:
end_time = time.time()
duration = end_time - start_time
print(f"Execution time: {round(duration,2)} seconds")

Execution time: 22.12 seconds
