In [0]:
bronze_base_path = "abfss://bronze@adfassignment07.dfs.core.windows.net/sales-view"
silver_base_path = "abfss://silver@adfassignment07.dfs.core.windows.net/sales-view"


In [0]:
%run /Workspace/Users/appujack799@gmail.com/azure_data_factory_assignment/src/bronze_to_silver/utils

In [0]:
from pyspark.sql.functions import col, split, when, to_date
import re


df_customer = spark.read.option("header", True).format("csv").load(f"{bronze_base_path}/customer/")
display(df_customer)

df_customer = clean_and_snake_case_columns(df_customer)
df_customer.printSchema()
display(df_customer)

df_customer = df_customer.withColumn("first_name", split(col("name"), " ").getItem(0)) \
                         .withColumn("last_name", split(col("name"), " ").getItem(1))


df_customer = df_customer.withColumn("domain", split(col("email_id"), "@").getItem(1)) \
                         .withColumn("domain", split(col("domain"), r"\.").getItem(0))


df_customer = df_customer.withColumn("gender", when(col("gender") == "male", "M")
                                               .when(col("gender") == "female", "F")
                                               .otherwise("O"))

df_customer = df_customer.withColumn("joining_date_split", split(col("joining_date"), " ")) \
                         .withColumn("date", to_date(col("joining_date_split").getItem(0), "MM-dd-yyyy")) \
                         .withColumn("time", col("joining_date_split").getItem(1)) \
                         .drop("joining_date_split")

df_customer = df_customer.withColumn("spent_numeric", col("spent").cast("double")) \
                         .withColumn("expenditure_status", 
                                     when(col("spent_numeric") < 200, "MINIMUM")
                                     .otherwise("MAXIMUM")) \
                         .drop("spent_numeric")

display(df_customer)

df_customer.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(f"{silver_base_path}/customer")


In [0]:
from pyspark.sql.functions import col, when

df_product = spark.read.option("header", True).csv(f"{bronze_base_path}/product")
display(df_product)

df_product = clean_and_snake_case_columns(df_product)

df_product = df_product.withColumn("sub_category", 
    when(col("category_id") == 1, "phone")
   .when(col("category_id") == 2, "laptop")
   .when(col("category_id") == 3, "playstation")
   .when(col("category_id") == 4, "e-device")
)

display(df_product)

df_product.write.format("delta").mode("overwrite").option("overwriteSchema", True) \
    .save(f"{silver_base_path}/product")


In [0]:
from pyspark.sql.functions import col, split, regexp_extract, to_date, when

df_store = spark.read.option("header", True).csv(f"{bronze_base_path}/store")
df_store.display()

df_store = clean_and_snake_case_columns(df_store)

df_store = df_store.withColumn(
    "store_category",
    split(split(col("email_address"), "@").getItem(1), r"\.").getItem(0)
)

date_pattern = r"^\d{2}-\d{2}-\d{4}$"

df_store = df_store.withColumn(
    "created_at",
    when(
        regexp_extract(col("created_at"), date_pattern, 0) != "",
        to_date(col("created_at"), "dd-MM-yyyy")
    ).otherwise(None)
)

df_store = df_store.withColumn(
    "updated_at",
    when(
        regexp_extract(col("updated_at"), date_pattern, 0) != "",
        to_date(col("updated_at"), "dd-MM-yyyy")
    ).otherwise(None)
)

df_store.display()

df_store.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/store")


Sales : Minimal transformations applied and moved from bronze to silver container

In [0]:
from pyspark.sql.functions import col, to_timestamp

df_sales = spark.read.option("header", True).csv(f"{bronze_base_path}/sales")
display(df_sales)

df_sales = clean_and_snake_case_columns(df_sales)
display(df_sales)


df_sales = df_sales.withColumn(
    "order_date", to_timestamp(col("order_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
).withColumn(
    "ship_date", to_timestamp(col("ship_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
)


df_sales.write.format("delta") \
    .mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{silver_base_path}/customer_sales")
