#### **Clean the Data - Silver**

In [9]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(, 8c912aaf-a291-4d20-b281-7771797c33ba, 11, Finished, Available, Finished)

##### **Clean customers**

In [10]:
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

StatementMeta(, 8c912aaf-a291-4d20-b281-7771797c33ba, 12, Finished, Available, Finished)

DataFrame[]

In [11]:
customers = spark.table("bronze.customers")

customers_clean = (
    customers
    .withColumn("email", lower(trim(col("EMAIL"))))
    .withColumn("name", initcap(trim(col("name"))))
    .withColumn("gender", when(lower(col("gender")).isin("f", "female"), "Female")
                          .when(lower(col("gender")).isin("m", "male"), "Male")
                          .otherwise("Other"))
    .withColumn("dob", to_date(regexp_replace(col("dob"), "/", "-")))
    .withColumn("location", initcap(col("location")))
    .dropDuplicates(["customer_id"])
    .dropna(subset=["customer_id", "email"])
)
customers_clean.write.format("delta").mode("overwrite").saveAsTable("silver.customers")

StatementMeta(, 8c912aaf-a291-4d20-b281-7771797c33ba, 13, Finished, Available, Finished)

##### **Clean orders**

In [12]:
orders = spark.table("bronze.orders")
orders_clean = (
    orders
    .withColumn("order_date", 
                when(col("order_date").rlike("^\d{4}/\d{2}/\d{2}$"), to_date(col("order_date"), "yyyy/MM/dd"))
                .when(col("order_date").rlike("^\d{2}-\d{2}-\d{4}$"), to_date(col("order_date"), "dd-MM-yyyy"))
                .when(col("order_date").rlike("^\d{8}$"), to_date(col("order_date"), "yyyyMMdd"))
                .otherwise(to_date(col("order_date"), "yyyy-MM-dd")))
    .withColumn("amount", col("amount").cast(DoubleType()))
    .withColumn("amount", when(col("amount") < 0, None).otherwise(col("amount")))
    .withColumn("status", initcap(col("status")))
    .dropna(subset=["customer_id", "order_date"])
    .dropDuplicates(["order_id"])
)
orders_clean.write.format("delta").mode("overwrite").saveAsTable("silver.orders")

StatementMeta(, 8c912aaf-a291-4d20-b281-7771797c33ba, 14, Finished, Available, Finished)

##### **Clean payments**

In [15]:
payments = spark.table("bronze.payments")

payments_clean = (
    payments
    .withColumn(
        "payment_date",
        to_date(regexp_replace(col("payment_date"), "/", "-"), "yyyy-MM-dd")
    )
    .withColumn(
        "payment_method",
        initcap(col("payment_method"))
    )
    .replace(
        {"Creditcard": "Credit Card"},
        subset=["payment_method"]
    )
    .withColumn(
        "payment_status",
        initcap(col("payment_status"))
    )
    .withColumn(
        "amount",
        col("amount").cast(DoubleType())
    )
    .withColumn(
        "amount",
        when(col("amount") < 0, None).otherwise(col("amount"))
    )
    .dropna(subset=["customer_id", "payment_date", "amount"])
)

payments_clean.write.format("delta").mode("overwrite").saveAsTable("silver.payments")

StatementMeta(, 8c912aaf-a291-4d20-b281-7771797c33ba, 17, Finished, Available, Finished)

##### **Clean web**

In [16]:
web = spark.table("bronze.web")
web_clean = (
    web
    .withColumn("session_time", to_date(regexp_replace(col("session_time"), "/", "-")))
    .withColumn("page_viewed", lower(col("page_viewed")))
    .withColumn("device_type", initcap(col("device_type")))
    .dropDuplicates(["session_id"])
    .dropna(subset=["customer_id", "session_time", "page_viewed"])
)
web_clean.write.format("delta").mode("overwrite").saveAsTable("silver.web")

StatementMeta(, 8c912aaf-a291-4d20-b281-7771797c33ba, 18, Finished, Available, Finished)

##### **Clean support**

In [17]:
support = spark.table("bronze.support")
support_clean = (
    support
    .withColumn("ticket_date", to_date(regexp_replace(col("ticket_date"), "/", "-")))
    .withColumn("issue_type", initcap(trim(col("issue_type"))))
    .withColumn("resolution_status", initcap(trim(col("resolution_status"))))
    .replace({"NA": None, "": None}, subset=["issue_type", "resolution_status"])
    .dropDuplicates(["ticket_id"])
    .dropna(subset=["customer_id", "ticket_date"])
)
support_clean.write.format("delta").mode("overwrite").saveAsTable("silver.support")

StatementMeta(, 8c912aaf-a291-4d20-b281-7771797c33ba, 19, Finished, Available, Finished)