In [0]:
%sql
use catalog customersdata;
use schema silver;

###Read data from bronze tables

In [0]:
from pyspark.sql import functions as F

#read data from customers_bronze table
df_customers_bronze = spark.read.table("customersdata.bronze.customers_bronze")
display(df_customers_bronze)

#read data from orders_bronze table
df_orders_bronze = spark.read.table("customersdata.bronze.orders_bronze")
display(df_orders_bronze)

###Data cleaning

In [0]:
#clean customers data

df_customers_silver = (
    df_customers_bronze
      .withColumn("customer_id", F.col("customer_id").cast("int"))
      .withColumn("age", F.col("age").cast("int"))
      .withColumn("income", F.col("income").cast("double"))
      .withColumn("signup_date", F.to_date("signup_date", "yyyy-MM-dd"))
      .withColumn("name", F.trim("name"))
      .withColumn("country", F.trim("country"))
      .withColumn("segment", F.trim("segment"))
      .dropDuplicates(["customer_id"])
)

display(df_customers_silver)
df_customers_silver.printSchema()

#save customers clean data in silver table
df_customers_silver.write.mode("overwrite").saveAsTable("customersdata.silver.customers_silver")



In [0]:

from pyspark.sql import functions as F

df_orders_silver_raw = (
    df_orders_bronze

      # Safe cast numeric fields â€” only cast if the value contains digits
      .withColumn("order_id",
                  F.when(F.col("order_id").rlike("^[0-9]+$"), F.col("order_id").cast("int"))
                   .otherwise(None))

      .withColumn("customer_id",
                  F.when(F.col("customer_id").rlike("^[0-9]+$"), F.col("customer_id").cast("int"))
                   .otherwise(None))

      .withColumn("quantity",
                  F.when(F.col("quantity").rlike("^[0-9]+$"), F.col("quantity").cast("int"))
                   .otherwise(None))

      .withColumn("unit_price",
                  F.when(F.col("unit_price").rlike("^[0-9.]+$"), F.col("unit_price").cast("double"))
                   .otherwise(None))

      .withColumn("discount",
                  F.when(F.col("discount").rlike("^[0-9.]+$"), F.col("discount").cast("double"))
                   .otherwise(None))

      # Convert order_date safely
      .withColumn("order_date", F.to_date("order_date"))

      # Derived columns
      .withColumn("discount_factor", 1 - (F.col("discount") / 100.0))
      .withColumn("gross_amount", F.col("quantity") * F.col("unit_price"))
      .withColumn("net_amount", F.col("gross_amount") * F.col("discount_factor"))

      # Trim text columns
      .withColumn("product", F.trim("product"))
      .withColumn("category", F.trim("category"))
      .withColumn("payment_method", F.trim("payment_method"))
      .withColumn("region", F.trim("region"))

      .drop("discount_factor")
)

display(df_orders_silver_raw)


#save orders raw data in silver table
df_orders_silver_raw.write.mode("overwrite").saveAsTable("customersdata.silver.orders_silver_raw")


###Build error table for silver raw

In [0]:
df_orders_errors = df_orders_silver_raw.filter(
    F.col("order_id").isNull()     |
    F.col("customer_id").isNull()  |
    F.col("quantity").isNull()     |
    F.col("unit_price").isNull()
)

display(df_orders_errors)

#save this in silver error table
df_orders_errors.write.format("delta").mode("overwrite").saveAsTable("customersdata.silver.orders_errors")



###Create clean silver table

In [0]:
df_orders_silver = df_orders_silver_raw.filter(
    F.col("order_id").isNotNull()    &
    F.col("customer_id").isNotNull() &
    F.col("quantity").isNotNull()    &
    F.col("unit_price").isNotNull()
)

display(df_orders_silver)

#save this in silver table
df_orders_silver.write.format("delta").mode("overwrite").saveAsTable("customersdata.silver.orders_silver")




In [0]:
%sql
select * from customersdata.silver.customers_silver 

In [0]:
%sql
select * from customersdata.silver.orders_silver