In [0]:
# Validation of bronze layer
spark.read.table("migration_project_db_ws.bronze.customers").display()

customer_id,first_name,last_name,email,country,created_date,ingestion_timestamp,source_system
101,Chirag,Venkateshaiah,chiragvenkateshaiah@gmail.com,India,2024-01-01,2026-01-15T03:50:49.544Z,crm
102,Joel,Marsh,joelmarsh@outlook.com,USA,2024-01-02,2026-01-15T03:52:54.897Z,crm


In [0]:
# Expection from the Bronze Layer
# Columns are messy (string types, nulls, duplicates)

In [0]:
# Inspect Bronze Schema
spark.read.table("migration_project_db_ws.bronze.customers").printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- country: string (nullable = true)
 |-- created_date: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_system: string (nullable = true)



In [0]:
from pyspark.sql.functions import col, to_date, row_number, lower
from pyspark.sql.window import Window


bronze_df = spark.read.table("bronze.customers")


In [0]:
# Standardize Column Names
customers_df = (
    bronze_df
    .withColumnRenamed("CUSTOMER_ID", "customer_id")
    .withColumnRenamed("FIRST_NAME", "first_name")
    .withColumnRenamed("LAST_NAME", "last_name")
    .withColumnRenamed("EMAIL", "email")
    .withColumnRenamed("COUNTRY", "country")
    .withColumnRenamed("CREATED_DATE", "created_date")
)

In [0]:
# Clean and Transform Data

window_spec = Window.partitionBy("customer_id").orderBy(col("created_date").desc())

silver_customers_df = (
    customers_df
    .withColumn("customer_id", col("customer_id").cast("int"))
    .withColumn("created_date", to_date(col("created_date")))
    .withColumn("email", lower(col("email")))
    .filter(col("customer_id").isNotNull())
    .filter(col("email").isNotNull())
    .withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num")==1)
    .drop("row_num")
)

In [0]:
silver_customers_df.display()

customer_id,first_name,last_name,email,country,created_date,ingestion_timestamp,source_system
101,Chirag,Venkateshaiah,chiragvenkateshaiah@gmail.com,India,2024-01-01,2026-01-15T03:50:49.544Z,crm
102,Joel,Marsh,joelmarsh@outlook.com,USA,2024-01-02,2026-01-15T03:52:54.897Z,crm


In [0]:
silver_customers_df.write.mode("overwrite").saveAsTable("migration_project_db_ws.silver.customers")

In [0]:
spark.read.table("silver.customers").printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- country: string (nullable = true)
 |-- created_date: date (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_system: string (nullable = true)

