**Data is going to be merged from the bronze layer. If same kind of data arises merges.**

In [0]:
spark.sql("USE globalretail_silver")
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver_customers (
    customer_id STRING,
    name STRING,
    email STRING,
    country STRING,
    customer_type STRING,
    registration_date DATE,
    age INT,
    gender STRING,
    total_purchases INT,
    customer_segment STRING,
    days_since_registration INT,
    last_updated TIMESTAMP)
""")

*Identifying new data from the old data*

In [0]:
# Get the last processed timestamp from silver layer
last_processed_df = spark.sql("SELECT MAX(last_updated) as last_processed FROM silver_customers")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"

In [0]:
# Create a temporary view of incremental bronze data
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW bronze_incremental AS
SELECT *
FROM globalretail_bronze.bronze_customer c where  c.ingestion_timestamp > '{last_processed_timestamp}'
""")

In [0]:
spark.sql("select * from bronze_incremental").show()

In [0]:
# Validate email address (null or not null)
# Valid age between 18 to 100
# Create customer_segment as total_purchases > 10000 THEN 'High Value' if > 5000 THEN 'Medium Value' ELSE 'Low Value'
# Days since user is registered in the system
# Remove any junk records where total_purchase is negative number

In [0]:
spark.sql("""
          CREATE OR REPLACE TEMPORARY VIEW silver_incremental AS
          SELECT 
                customer_id,
                name,
                email,
                country,
                customer_type,
                registration_date,
                age,
                gender,
                total_purchases,
                CASE
                    WHEN total_purchases > 10000 THEN 'High Value'
                    WHEN total_purchases > 5000 THEN 'Medium Value'
                    ELSE 'Low Value'
                END AS customer_segment,
                datediff(current_date(), registration_date) AS days_since_registration,
                current_timestamp() AS last_updated
          FROM bronze_incremental
          WHERE
                age BETWEEN 18 AND 100 AND
                email IS NOT NULL AND
                total_purchases >= 0
                
          """)

In [0]:
spark.sql("SELECT * FROM silver_incremental").display()

In [0]:
spark.sql("SELECT DISTINCT customer_segment FROM silver_incremental").show()