# READ THE BRONZE DATA AND CREATE DATAFRAME

In [2]:
customers_raw = spark.read.parquet("abfss://ecommercews@onelake.dfs.fabric.microsoft.com/ecommerce_lakehouse.Lakehouse/Files/Bronze/customers.parquet")
orders_raw = spark.read.parquet("abfss://ecommercews@onelake.dfs.fabric.microsoft.com/ecommerce_lakehouse.Lakehouse/Files/Bronze/orders.parquet")
payments_raw = spark.read.parquet("abfss://ecommercews@onelake.dfs.fabric.microsoft.com/ecommerce_lakehouse.Lakehouse/Files/Bronze/payments.parquet")
support_raw = spark.read.parquet("abfss://ecommercews@onelake.dfs.fabric.microsoft.com/ecommerce_lakehouse.Lakehouse/Files/Bronze/support_tickets.parquet")
web_raw = spark.read.parquet("abfss://ecommercews@onelake.dfs.fabric.microsoft.com/ecommerce_lakehouse.Lakehouse/Files/Bronze/web_activities.parquet")

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 4, Finished, Available, Finished)

# READ BRONZE DATA

In [4]:
display(customers_raw.limit(5))

StatementMeta(, 47dd7645-e554-4dd3-a9f7-df0c9006a008, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 136931ec-c1c5-4dd8-9eeb-21c644d8127e)

In [5]:
display(orders_raw.limit(5))

StatementMeta(, 47dd7645-e554-4dd3-a9f7-df0c9006a008, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f9a77483-c453-44df-a47e-ccf347cda797)

In [6]:
display(payments_raw.limit(5))

StatementMeta(, 47dd7645-e554-4dd3-a9f7-df0c9006a008, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 157bd7c3-9b6a-4f07-a6e5-569ecaed8b6a)

In [7]:
display(support_raw.limit(5))

StatementMeta(, 47dd7645-e554-4dd3-a9f7-df0c9006a008, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7352317b-ad62-49e4-8fbc-582115e6ed8f)

In [8]:
display(web_raw.limit(5))

StatementMeta(, 47dd7645-e554-4dd3-a9f7-df0c9006a008, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9ec72094-b587-428d-9129-b6e56a4ed3ca)

# CREATE DELTA BRONZE TABLE

In [3]:
customers_raw.write.format("delta").mode("overwrite").saveAsTable("customers")
orders_raw.write.format("delta").mode("overwrite").saveAsTable("orders")
payments_raw.write.format("delta").mode("overwrite").saveAsTable("payments")
support_raw.write.format("delta").mode("overwrite").saveAsTable("support")
web_raw.write.format("delta").mode("overwrite").saveAsTable("web")


StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 5, Finished, Available, Finished)

# DATA CLEANING - SILVER

## CUSTOMER TABLE

In [5]:
display(customers_raw.limit(5))

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9a350d53-43a5-4ebf-bf57-067a1a28765a)

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

customers_clean = (
    customers_raw
    .withColumn("email", lower(trim(col("EMAIL"))))
    .withColumn("name", initcap(trim(col("name"))))
    .withColumn("gender", when(lower(col("gender")).isin("f", "female"), "Female")
                          .when(lower(col("gender")).isin("m", "male"), "Male")
                          .otherwise("Other"))
    .withColumn("dob", to_date(regexp_replace(col("dob"), "/", "-")))
    .withColumn("location", initcap(col("location")))
    .dropDuplicates(["customer_id"])
    .dropna(subset=["customer_id", "email"])
)

display(customers_clean.limit(5))

customers_clean.write.format("delta").mode("overwrite").saveAsTable("silver_customers")

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 904fab01-904e-472d-9562-d25a89599ce5)

## ORDER TABLE

In [11]:
%%sql
select * from orders

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 13, Finished, Available, Finished)

<Spark SQL result set with 15 rows and 5 fields>

In [8]:
orders = spark.table("orders")

orders_clean = (
    orders
    .withColumn("order_date", 
                when(col("order_date").rlike("^\d{4}/\d{2}/\d{2}$"), to_date(col("order_date"), "yyyy/MM/dd"))
                .when(col("order_date").rlike("^\d{2}-\d{2}-\d{4}$"), to_date(col("order_date"), "dd-MM-yyyy"))
                .when(col("order_date").rlike("^\d{8}$"), to_date(col("order_date"), "yyyyMMdd"))
                .otherwise(to_date(col("order_date"), "yyyy-MM-dd")))
    .withColumn("amount", col("amount").cast(DoubleType()))
    .withColumn("amount", when(col("amount") < 0, None).otherwise(col("amount")))
    .withColumn("status", initcap(col("status")))
    .dropna(subset=["customer_id", "order_date"])
    .dropDuplicates(["order_id"])
)

display(orders_clean.limit(5))

orders_clean.write.format("delta").mode("overwrite").saveAsTable("silver_orders")

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fdda1675-9b4c-48c7-ab08-9ee7b5f99e2a)

## PAYMENTS TABLE

In [9]:
%%sql
select * from payments

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 11, Finished, Available, Finished)

<Spark SQL result set with 15 rows and 6 fields>

In [10]:
payments = spark.table("payments")
payments_clean = (
    payments
    .withColumn("payment_date", to_date(regexp_replace(col("payment_date"), "/", "-")))
    .withColumn("payment_method", initcap(col("payment_method")))
    .replace({"creditcard": "Credit Card"}, subset=["payment_method"])
    .withColumn("payment_status", initcap(col("payment_status")))
    .withColumn("amount", col("amount").cast(DoubleType()))
    .withColumn("amount", when(col("amount") < 0, None).otherwise(col("amount")))
    .dropna(subset=["customer_id", "payment_date", "amount"])
)

display(payments_clean.limit(5))

payments_clean.write.format("delta").mode("overwrite").saveAsTable("silver_payments")

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 98d770ec-1d8a-4aa4-a547-0ca9ba442ad1)

## SUPPORT TABLE

In [12]:
%%sql
select * from support

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 14, Finished, Available, Finished)

<Spark SQL result set with 15 rows and 5 fields>

In [13]:
support = spark.table("support")
support_clean = (
    support
    .withColumn("ticket_date", to_date(regexp_replace(col("ticket_date"), "/", "-")))
    .withColumn("issue_type", initcap(trim(col("issue_type"))))
    .withColumn("resolution_status", initcap(trim(col("resolution_status"))))
    .replace({"NA": None, "": None}, subset=["issue_type", "resolution_status"])
    .dropDuplicates(["ticket_id"])
    .dropna(subset=["customer_id", "ticket_date"])
)

display(support_clean.limit(5))

support_clean.write.format("delta").mode("overwrite").saveAsTable("silver_support")


StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f48ca1f4-f27f-4ec4-95b3-a38da0d99084)

## WEB TABLE

In [14]:
%%sql
select * from web

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 16, Finished, Available, Finished)

<Spark SQL result set with 15 rows and 5 fields>

In [15]:
web = spark.table("web")
web_clean = (
    web
    .withColumn("session_time", to_date(regexp_replace(col("session_time"), "/", "-")))
    .withColumn("page_viewed", lower(col("page_viewed")))
    .withColumn("device_type", initcap(col("device_type")))
    .dropDuplicates(["session_id"])
    .dropna(subset=["customer_id", "session_time", "page_viewed"])
)

display(web_clean.limit(5))

web_clean.write.format("delta").mode("overwrite").saveAsTable("silver_web")

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 545c31f7-4bdd-4cb1-a2e2-dd7d5516891c)

# AGGREGATE TABLES - GOLD LAYER

In [18]:
from pyspark.sql import functions as F

cust = spark.table("silver_customers").alias("c")
orders = spark.table("silver_orders").alias("o")
payments = spark.table("silver_payments").alias("p")
support = spark.table("silver_support").alias("s")
web = spark.table("silver_web").alias("w")

customer360 = (
    cust
    .join(orders, on="customer_id", how="left")
    .join(payments, on="customer_id", how="left")
    .join(support, on="customer_id", how="left")
    .join(web, on="customer_id", how="left")
    .select(
        F.col("c.customer_id"),
        F.col("c.name"),
        F.col("c.email"),
        F.col("c.gender"),
        F.col("c.dob"),
        F.col("c.location"),

        F.col("o.order_id"),
        F.col("o.order_date"),
        F.col("o.amount").alias("order_amount"),
        F.col("o.status").alias("order_status"),

        F.col("p.payment_method"),
        F.col("p.payment_status"),
        F.col("p.amount").alias("payment_amount"),

        F.col("s.ticket_id"),
        F.col("s.issue_type"),
        F.col("s.ticket_date"),
        F.col("s.resolution_status"),

        F.col("w.page_viewed"),
        F.col("w.device_type"),
        F.col("w.session_time")
    )
)

# Preview sample rows
display(customer360.limit(10))

# Write to Delta as a Gold table
customer360.write.format("delta").mode("overwrite").saveAsTable("gold_customer360")

StatementMeta(, a444d610-1011-4929-b1a4-f155978b5239, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 106eabfa-1365-4be8-a2b1-38c9222651bf)