### **Import Libraries**

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 4, Finished, Available, Finished)

### **<mark>Creating Dataframes</mark>**

In [3]:
customers_raw = spark.read.parquet("Files/Bronze/customers.parquet")
orders_raw = spark.read.parquet("Files/Bronze/orders.parquet")
payments_raw = spark.read.parquet("Files/Bronze/payments.parquet")
support_tickets_raw = spark.read.parquet("Files/Bronze/support_tickets.parquet")
web_activities_raw = spark.read.parquet("Files/Bronze/web_activities.parquet")


StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 5, Finished, Available, Finished)

## **<u>Display the dataframe</u>**

In [3]:
display(customers_raw.limit(5))

StatementMeta(, ecf291ed-842b-4b92-bc8a-14c59603d46f, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0d9c6357-bcf4-44d9-9a26-12f7a82e4bd5)

In [4]:
display(orders_raw.limit(5))

StatementMeta(, ecf291ed-842b-4b92-bc8a-14c59603d46f, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3de8e4b3-4c09-402b-a69d-a8cb9ab2ae39)

In [5]:
display(payments_raw.limit(5))

StatementMeta(, ecf291ed-842b-4b92-bc8a-14c59603d46f, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fd74ee19-81c9-454f-8128-c8a15533df1b)

In [6]:
display(support_tickets_raw.limit(5))

StatementMeta(, ecf291ed-842b-4b92-bc8a-14c59603d46f, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9d768d57-d21a-4f6a-8764-2aa5c29c754a)

In [7]:
display(web_activities_raw.limit(5))

StatementMeta(, ecf291ed-842b-4b92-bc8a-14c59603d46f, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6e396294-aa3d-4f4f-9694-912191476297)

## **Cleaning & Saving the cleanse data in silver table**

In [8]:
display(customers_raw.limit(5))

StatementMeta(, ecf291ed-842b-4b92-bc8a-14c59603d46f, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 09871bb8-0e71-4a3d-842e-d538b53cd424)

In [6]:
customer_clean = (
    customers_raw
    .withColumn('email',lower(trim(col("EMAIL"))))
    .withColumn('name',initcap(trim(col("name"))))
    .withColumn('gender',when(lower(col("gender")).isin("f","female"),"Female")
                        .when(lower(col("gender")).isin("m","male"),"Male")
                        .otherwise("Others"))
    .withColumn('dob',to_date(regexp_replace(col("dob"),"/","-")))
    .withColumn('location',initcap(trim("location")))
    .dropDuplicates(subset=["customer_id"])
    .dropna(subset=["customer_id","email"])
)

customer_clean.write.format("delta").mode("overwrite").saveAsTable("silver_customers")

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 8, Finished, Available, Finished)

In [9]:
display(orders_raw.limit(5))

StatementMeta(, ecf291ed-842b-4b92-bc8a-14c59603d46f, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4b49aa97-ee0d-4403-bbbb-2e9d2f93bc75)

In [13]:
orders_clean = (
    orders_raw
    .withColumn("order_date",
        when(col("order_date").rlike("^\d{4}/\d{2}/\d{2}$"),to_date(col("order_date"),"yyyy/MM/dd"))
        .when(col("order_date").rlike("^\d{2}-\d{2}-\d{4}$"),to_date(col("order_date"),"dd-MM-yyyy"))
        .when(col("order_date").rlike("^\d{8}$"),to_date(col("order_date"),"yyyyMMdd"))
        .otherwise(to_date(col("order_date"),"yyyy-MM-dd")))
    .withColumn("amount",col("amount").cast(DoubleType()))
    .withColumn("amount",when(col("amount")<0,None).otherwise(col("amount")))
    .withColumn("status",initcap(col("status")))
    .dropDuplicates(subset=["order_id"])
    .dropna(subset=["order_id","customer_id"])
)

display(orders_clean.limit(6))
orders_clean.write.format("delta").mode("overwrite").saveAsTable("silver_orders")

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ea152bff-6e75-4fa9-8f70-826b031302bc)

In [14]:
display(payments_raw.limit(5))

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f8ffc631-21cb-406b-9f30-1adb7e46dc67)

In [21]:
Payments_clean = (
    payments_raw
    .withColumn("payment_date",to_date(regexp_replace(col("payment_date"),"/","-")))
    .withColumn("payment_method", initcap("payment_method"))
    .replace({"Creditcard" : "Credit Card"},subset=["payment_method"])
    .withColumn("payment_status", initcap("payment_status"))
    .withColumn("amount",col("amount").cast(DoubleType()))
    .withColumn("amount",when(col("amount")<0,None).otherwise(col("amount")))
    .dropDuplicates(subset=["payment_id"])
    .dropna(subset=["customer_id","payment_date","amount"])
)

display(Payments_clean.limit(5))
Payments_clean.write.format("delta").mode("overwrite").saveAsTable("silver_payments")

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 57a5c0cb-2803-49c4-aca5-6419f02b2ddd)

In [22]:
display(support_tickets_raw.limit(5))

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, eb7972b1-ebf3-4ee4-ad64-d5f6e6f81ec9)

In [24]:
Support_clean = (
    support_tickets_raw
    .withColumn("issue_type",initcap(col("issue_type")))
    .withColumn("ticket_date",to_date(regexp_replace(col("ticket_date"),"/","-")))
    .withColumn("resolution_status",initcap(col("resolution_status")))
    .replace({"NA":None,"":None},subset=["issue_type","resolution_status"])
    .dropDuplicates(subset=["ticket_id"])
    .dropna(subset=["ticket_id","ticket_date"])
)

display(Support_clean.limit(5))
Support_clean.write.format("delta").mode("overwrite").saveAsTable("silver_support_tickets")

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a3222c47-c6f8-45a5-acf7-00bbd46073f5)

In [25]:
display(web_activities_raw.limit(5))

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1c0f4ae3-8166-47e2-89a4-05c2544f3be1)

In [27]:
Web_clean = (
    web_activities_raw
    .withColumn("page_viewed",lower(trim(col("page_viewed"))))
    .withColumn("session_time",to_date(regexp_replace(col("session_time"),"/","-")))
    .withColumn("device_type",initcap(col("device_type")))
    .dropDuplicates(subset=["session_time"])
    .dropna(subset=["customer_id","session_time","page_viewed"])
)

display(Web_clean.limit(5))

Web_clean.write.format("delta").mode("overwrite").saveAsTable("silver_web_activities")

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e6127ae4-e4a1-4d18-b23e-aef1e511a1e0)

## **Creating Gold Layer - Table**


In [29]:
cust = customer_clean.alias("c")
orders = orders_clean.alias("o")
pay = Payments_clean.alias("p")
support = Support_clean.alias("s")
web = Web_clean.alias("w")

Customer360 = (
    cust
    .join(orders,"customer_id","left")
    .join(pay,"customer_id","left")
    .join(support,"customer_id","left")
    .join(web,"customer_id","left")
    .select(
        col("c.customer_id"),
        col("c.name"),
        col("c.email"),
        col("c.gender"),
        col("c.dob"),
        col("c.location"),

        col("o.order_id"),
        col("o.order_date"),
        col("o.amount").alias("order_amt"),
        col("o.status").alias("order_status"),

        col("p.payment_method"),
        col("p.payment_status"),
        col("p.amount").alias("payment_amt"),

        col("s.ticket_id"),
        col("s.issue_type"),
        col("s.ticket_date"),
        col("s.resolution_status"),

        col("w.page_viewed"),
        col("w.device_type"),
        col("w.session_time")
    
    )

)

display(Customer360.limit(5))

Customer360.write.format("delta").mode("overwrite").saveAsTable("gold_cust_360")

StatementMeta(, 0267b623-51df-4d8c-8a7d-5404a127b2ab, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 42218bab-b4cb-4f39-b2b8-79a811ad063f)