In [1]:
# Create the silver_customers Table where the clean data will be stored
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_customers (
    customer_id STRING,
    name STRING,
    email STRING,
    country STRING,
    customer_type STRING,
    registration_date DATE,
    age INT,
    gender STRING,
    total_purchases INT,
    customer_segment STRING,
    days_since_registration INT,
    last_updated TIMESTAMP
)
""")

StatementMeta(, e323296b-2937-464c-a225-97ccb6623448, 3, Finished, Available, Finished)

DataFrame[]

In [2]:
# Get the last_updated timestamp to identify incremental data from the Bronze Layer
last_processed_df = spark.sql("SELECT MAX(last_updated) as last_processed FROM silver_customers")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"

StatementMeta(, e323296b-2937-464c-a225-97ccb6623448, 4, Finished, Available, Finished)

In [3]:
# Create a temporary view of newly ingested customer data from the Bronze layer by filtering data based on the last processed timestamp
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW bronze_incremental AS
SELECT *
FROM bronzelayer.customer c where  c.ingestion_timestamp > '{last_processed_timestamp}'
""")

StatementMeta(, e323296b-2937-464c-a225-97ccb6623448, 5, Finished, Available, Finished)

DataFrame[]

In [4]:
# Transform the customer data.

# Apply the required transformations: email validation, validate age, calculate days since registration and remove invalid records

spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental AS
SELECT
    customer_id,
    name,
    email,
    country,
    customer_type,
    registration_date,
    age,
    gender,
    total_purchases,
    CASE
        WHEN total_purchases > 10000 THEN 'High Value'
        WHEN total_purchases > 5000 THEN 'Medium Value'
        ELSE 'Low Value'
    END AS customer_segment,
    DATEDIFF(CURRENT_DATE(), registration_date) AS days_since_registration,
    CURRENT_TIMESTAMP() AS last_updated
FROM bronze_incremental
WHERE 
    age BETWEEN 18 AND 100
    AND email IS NOT NULL
    AND total_purchases >= 0
""")

StatementMeta(, e323296b-2937-464c-a225-97ccb6623448, 6, Finished, Available, Finished)

DataFrame[]

In [5]:
# MERGE command to upsert the transformed data into the Silver layer, ensuring that both new and updated records are handled correctly
spark.sql("""
MERGE INTO silver_customers target
USING silver_incremental source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
""")

StatementMeta(, e323296b-2937-464c-a225-97ccb6623448, 7, Finished, Available, Finished)

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [6]:
# Read and verify the Silver layer customer data
spark.sql("select count(*) from silver_customers").show()

StatementMeta(, e323296b-2937-464c-a225-97ccb6623448, 8, Finished, Available, Finished)

+--------+
|count(1)|
+--------+
|     930|
+--------+

