# **<mark>_ECOMERCE SILVER LAYER DATA PIPELINE_ </mark>**


In [None]:
# Step 3: Import necessary modules for Spark DataFrame operations
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 5, Finished, Available, Finished, False)

## <mark><u>**Customer Data**</u></mark>

#### **Load Customer Data From Bronze Layer**

In [None]:
# Step 4: Load customers data from Bronze layer
df_customers = spark.sql("SELECT * FROM Bronze.dbo.tb_customers")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 39, Finished, Available, Finished, False)

#### **Clean and transform customer data**

In [None]:
# Step 5: Clean and transform customer data
df_customers_clean = (
    df_customers
    # Clean email column: lowercase and trim
    .withColumn("email", lower(trim(col("EMAIL")))) 
    # Clean name column: set to 'N/A' if empty, otherwise capitalize and trim
    .withColumn("name", when(length(col("name")) == 0, "N/A")
                        .otherwise(initcap(trim(col("name"))))
                )
    # Standardize gender column: Female, Male, or Other
    .withColumn("gender", when(lower(trim(col("gender"))).isin("f", "female"), "Female")
                            .when(lower(trim(col("gender"))).isin("m", "male"), "Male")
                            .otherwise("Other")
                )
    # Clean location column: capitalize and trim
    .withColumn("location", initcap(trim(col("location"))))
    # Clean date of birth column: set default if empty or invalid, otherwise parse date
    .withColumn("dob", when((length(col("dob")) == 0) | (lower(trim(col("dob"))).ilike("%not%")), 
                            to_date(lit("1990-01-01"), "yyyy-MM-dd")) # Ensuring result is a DateType
                        .otherwise(coalesce(
                                        to_date(col("dob"), "yyyy-MM-dd"),
                                        to_date(col("dob"), "yyyy/MM/dd"),
                                        to_date(col("dob"), "dd-MM-yyyy")))
                )
    # Remove duplicate and null customer_id entries
    .dropDuplicates(["customer_id"])
    .dropna(subset=["customer_id"])
    )

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 40, Finished, Available, Finished, False)

#### **Store Clean Data into Silver Layer**

In [37]:
df_customers_clean.write.format("delta").mode("overwrite").saveAsTable("Silver.dbo.tb_customers")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 46, Finished, Available, Finished, False)

## <mark><u>**Orders Data**</u></mark>

#### **Load Orders Data**

In [59]:
# Load the orders data
df_orders = spark.sql("select * from Bronze.dbo.tb_orders")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 68, Finished, Available, Finished, False)

#### **Clean and Transform Orders Data**

In [60]:
# Clean and Transform Data
df_orders_clean = (
    df_orders
        .withColumn("order_date", coalesce(
                                            to_date(col("order_date"), "dd-MM-yyyy"),
                                            to_date(col("order_date"), "yyyyMMdd"),
                                            to_date(col("order_date"), "dd/MM/yyyy"),
                                            to_date(col("order_date"), "yyyy/MM/dd"),
                                            to_date(col("order_date"), "yyyy-MM-dd")
                                            )
                    )
        .withColumn("amount", col("amount").cast(DoubleType()))
        .withColumn("amount", when(col("amount") > 0, col("amount")))
        .withColumn("amount", when(col("amount").isNull(), lit(0)).otherwise(col("amount")))
        .withColumn("status", initcap(col("status")))
        .dropna(subset=['customer_id', 'order_date'])
        .dropDuplicates(['order_id'])
)

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 69, Finished, Available, Finished, False)

#### **Store Orders Data Into Silver Layer**

In [61]:
# Store Cleaned Data Into Silver Layer 
df_orders_clean.write.format("delta").mode("overwrite").saveAsTable("Silver.dbo.tb_orders")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 70, Submitted, Running, Running, True)

## **<mark><u>Payments</u></mark>**

#### ****Load Payments Data****

In [78]:
# Load Payments Data From Bronze Layer
df_payments = spark.sql("select * from Bronze.dbo.tb_payments")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 87, Finished, Available, Finished, False)

#### **Clean and Transform The Payment Data**

In [80]:
# Clean and Transform the Data
df_payments_clean = (
    df_payments
        .withColumn("payment_date", when(col("payment_date") == 'n/a', to_date(lit("1900-01-01"), "yyyy-MM-dd"))
                                        .otherwise(coalesce(
                                                    to_date(col("payment_date"), "yyyy-MM-dd"),
                                                    to_date(col("payment_date"), "dd-MM-yyyy"),
                                                    to_date(col("payment_date"), "yyyy/MM/dd"),
                                                    to_date(col("payment_date"), "dd/MM/yyyy"),
                                                    to_date(col("payment_date"), "yyyyMMdd")))
                    )
        .withColumn("payment_method", when(trim(lower(col("payment_method"))).isin("creditcard", "credit card"), "CREDIT CARD")
                                        .otherwise(upper(trim(col("payment_method"))))
                    )
        .withColumn("payment_status", when(length(trim(col("payment_status"))) == 0, 'Unknown')
                                        .otherwise(initcap(trim(col("payment_status"))))
                    )
        .withColumn("amount", col("amount").cast(DoubleType()))
        .withColumn("amount", when(col("amount").isNull(), None)
                                .when(col("amount") < 0, None)
                                .otherwise(col("amount"))
        )
)

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 89, Finished, Available, Finished, False)

#### **Store Cleaned Payments Data Into Silver Layer**

In [81]:
# Store the cleaned data into silver layer
df_payments_clean.write.format("delta").mode("overwrite").saveAsTable("Silver.dbo.tb_payments")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 90, Finished, Available, Finished, False)

## **<mark><u>Support Tickets</u></mark>**

#### **Load Support Tickets Data**

In [98]:
# load support tickets data from the silver layer
df_support_ticket = spark.sql("select * from Bronze.dbo.tb_support_tickets")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 107, Finished, Available, Finished, True)

In [105]:
df_support_ticket_clean = (
    df_support_ticket
        .withColumn("issue_type", when(lower(trim(col("issue_type"))).isin("payment error", "payment Issue"), "Payment Issue")
                                    .when(lower(trim(col("issue_type"))).isin("refund request", "refund"), "Refund Request")
                                    .when(lower(trim(col("issue_type"))) == "na", "Unknown")
                                    .otherwise(initcap(col("issue_type")))
                    )
        .withColumn("resolution_status", when(length(trim(col("resolution_status"))) == 0, "Unknown")
                                            .otherwise(initcap(trim(col("resolution_status"))))
                    )
        .withColumn("ticket_date", when(col("ticket_date") == 'n/a', to_date(lit('1900-01-01'), "yyyy-MM-dd"))
                                    .otherwise(coalesce(
                                                    to_date(col("ticket_date"), "yyyy-MM-dd"),
                                                    to_date(col("ticket_date"), "dd-MM-yyyy"),
                                                    to_date(col("ticket_date"), "yyyyMMdd"),
                                                    to_date(col("ticket_date"), "yyyy/MM/dd"),
                                                    to_date(col("ticket_date"), "dd/MM/yyyy")))
                    
                    )
        .dropDuplicates(['ticket_id'])
        .dropna(subset=['customer_id', 'ticket_date'])
    )

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 114, Finished, Available, Finished, False)

#### **Store Cleaned Support Tickets Data Into Silver Layer**

In [106]:
# store cleaned data to silver layer
df_support_ticket_clean.write.format("delta").mode("overwrite").saveAsTable("Silver.dbo.tb_support_ticket")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 115, Finished, Available, Finished, False)

## <mark><u>**WEB Activities**</u></mark>

#### **Load Web Activities Data from Silver Layer**

In [107]:
# Load web activities data
df_web_activities = spark.sql("select * from Bronze.dbo.tb_web_activities")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 116, Finished, Available, Finished, False)

#### **Clean and Transform Web Activities Data**

In [117]:
# Clean and Transform the data
df_web_activities_clean = (
    df_web_activities
        .withColumn("device_type", upper(trim(col("device_type"))))
        .withColumn("page_viewed", lower(trim(col("page_viewed"))))
        .withColumn("session_time", coalesce(
                                            to_date(col("session_time"), "yyyy-MM-dd"),
                                            to_date(col("session_time"), "dd-MM-yyyy"),
                                            to_date(col("session_time"), "yyyyMMdd"),
                                            to_date(col("session_time"), "yyyy/MM/dd"),
                                            to_date(col("session_time"), "dd/MM/yyyy"))
                    )
        .dropDuplicates(["session_id"])
        .dropna(subset=['customer_id', 'session_time', 'page_viewed'])
)

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 126, Finished, Available, Finished, False)

#### **Store Cleaned Web Activities Data Into Silver Layer**

In [118]:
# Store the cleaned data into silver layer
df_web_activities_clean.write.format("delta").mode("overwrite").saveAsTable("Silver.dbo.tb_web_activities")

StatementMeta(, 8bc06080-b593-42c7-912b-e627c0293a47, 127, Finished, Available, Finished, False)