In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc, lit, max,when,trim,filter
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable

## Define expected schema for Bronze fallback

In [0]:
schema = StructType([
    StructField("Customer_ID", IntegerType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Annual_Income", DoubleType(), True),
    StructField("Total_Spend", DoubleType(), True),
    StructField("Years_as_Customer", IntegerType(), True),
    StructField("Num_of_Purchases", IntegerType(), True),
    StructField("Average_Transaction_Amount", DoubleType(), True),
    StructField("Num_of_Returns", IntegerType(), True),
    StructField("Num_of_Support_Contacts", IntegerType(), True),
    StructField("Last_Purchase_Days_Ago", IntegerType(), True),
    StructField("Promotion_Response", StringType(), True),
    StructField("Satisfaction_Score", IntegerType(), True),
    StructField("Email_Opt_In", StringType(), True),
    StructField("Target_Churn", StringType(), True),
    StructField("ingest_timestamp", TimestampType(), True)
])


##  Read from Bronze Table 

In [0]:
from pyspark.sql.utils import AnalysisException
try:
    bronze_df = spark.read.format("delta").table("rawdata.bronze_layer.rawfile")
    bronze_exists = True #table exists          
except AnalysisException:
    # If table does not exist, create empty DataFrame
    bronze_df = spark.createDataFrame([], schema=schema)
    bronze_exists = False # table does NOT exist



In [0]:
bronze_df.display()

Customer_ID,Age,Gender,Annual_Income,Satisfaction_Score,Email_Opt_In,Target_Churn,Total_Spend,Years_as_Customer,Num_of_Purchases,Average_Transaction_Amount,Num_of_Returns,Num_of_Support_Contacts,Last_Purchase_Days_Ago,Promotion_Response,ingest_date
1,62,Other,45.15,3,True,True,5892.58,5,22,453.8,2,0,129,Responded,2025-12-26
2,65,Male,79.51,3,False,False,9025.47,13,77,22.9,2,2,227,Responded,2025-12-26
3,18,Male,29.19,2,False,True,618.83,13,71,50.53,5,2,283,Responded,2025-12-26
4,21,Other,79.63,5,True,True,9110.3,3,33,411.83,5,3,226,Ignored,2025-12-26
5,21,Other,77.66,5,False,False,5390.88,15,43,101.19,3,0,242,Unsubscribed,2025-12-26
6,57,Male,190.43,4,False,False,255.19,19,85,417.78,5,1,130,Unsubscribed,2025-12-26
7,27,Male,172.13,1,True,False,3512.55,3,77,316.18,0,3,61,Unsubscribed,2025-12-26
8,37,Other,88.9,3,False,False,7270.9,4,87,63.61,3,4,224,Ignored,2025-12-26
9,39,Other,24.46,4,True,True,4935.49,1,21,173.8,9,2,126,Responded,2025-12-26
10,68,Other,169.59,2,True,False,9803.57,17,34,481.18,6,1,171,Unsubscribed,2025-12-26


## Get only new data from Bronze

In [0]:
def get_newly_ingested_data(bronze_df, silver_table):
    if spark._jsparkSession.catalog().tableExists(silver_table):
        silver_df = spark.table(silver_table)
        last_ts = silver_df.select(max("ingest_timestamp")).collect()[0][0]
        if last_ts:
            return bronze_df.filter(col("ingest_timestamp") > lit(last_ts))
    return bronze_df


## Load Bronze Delta table instead of CSV

In [0]:
dff = spark.read.table("rawdata.bronze_layer.rawfile")


In [0]:
from pyspark.sql.functions import col

df = dff.withColumn(
    "Customer_ID",
    col("Customer_ID").cast("int")
)


    


## Remove duplicate ID combinations

In [0]:
if "Customer_ID" in df.columns:
    bronze_df = df.dropDuplicates(["Customer_ID"]).display()

Customer_ID,Age,Gender,Annual_Income,Satisfaction_Score,Email_Opt_In,Target_Churn,Total_Spend,Years_as_Customer,Num_of_Purchases,Average_Transaction_Amount,Num_of_Returns,Num_of_Support_Contacts,Last_Purchase_Days_Ago,Promotion_Response,ingest_date
1,62,Other,45.15,3,True,True,5892.58,5,22,453.8,2,0,129,Responded,2025-12-26
2,65,Male,79.51,3,False,False,9025.47,13,77,22.9,2,2,227,Responded,2025-12-26
3,18,Male,29.19,2,False,True,618.83,13,71,50.53,5,2,283,Responded,2025-12-26
4,21,Other,79.63,5,True,True,9110.3,3,33,411.83,5,3,226,Ignored,2025-12-26
5,21,Other,77.66,5,False,False,5390.88,15,43,101.19,3,0,242,Unsubscribed,2025-12-26
6,57,Male,190.43,4,False,False,255.19,19,85,417.78,5,1,130,Unsubscribed,2025-12-26
7,27,Male,172.13,1,True,False,3512.55,3,77,316.18,0,3,61,Unsubscribed,2025-12-26
8,37,Other,88.9,3,False,False,7270.9,4,87,63.61,3,4,224,Ignored,2025-12-26
9,39,Other,24.46,4,True,True,4935.49,1,21,173.8,9,2,126,Responded,2025-12-26
10,68,Other,169.59,2,True,False,9803.57,17,34,481.18,6,1,171,Unsubscribed,2025-12-26


## Filter out invalid rows (negative or zero spends)

In [0]:
dff = df.filter(
    (col("Total_Spend").isNotNull()) & (col("Total_Spend") >= 0) &
    (col("Num_of_Returns").isNotNull()) & (col("Num_of_Returns") >= 0) &
    (col("Average_Transaction_Amount").isNotNull()) & (col("Average_Transaction_Amount") >= 0)
)



## Replace nulls in string columns (e.g:'Category') with 'Unknown'

In [0]:
for col_name, dtype in df.dtypes:
    if dtype == "string":
        df = df.withColumn(
            col_name,
            when(col(col_name).isNull(), "Unknown").otherwise(col(col_name))
        )


## Convert boolean-like strings to BooleanType

In [0]:
from pyspark.sql.functions import lower
from pyspark.sql.types import BooleanType

bool_cols = ["Email_Opt_In", "Target_Churn"]

for col_name in bool_cols:
    df = df.withColumn(
        col_name,
        when(lower(col(col_name)) == "true", True)
        .otherwise(False)
        .cast(BooleanType())
    )


## Drop rows where customer_id is null

In [0]:
df = df.filter(col("Customer_ID").isNotNull())

In [0]:
# Display top 5 rows
df.show(5, truncate=False)


+-----------+---+------+-------------+------------------+------------+------------+-----------+-----------------+----------------+--------------------------+--------------+-----------------------+----------------------+------------------+-----------+
|Customer_ID|Age|Gender|Annual_Income|Satisfaction_Score|Email_Opt_In|Target_Churn|Total_Spend|Years_as_Customer|Num_of_Purchases|Average_Transaction_Amount|Num_of_Returns|Num_of_Support_Contacts|Last_Purchase_Days_Ago|Promotion_Response|ingest_date|
+-----------+---+------+-------------+------------------+------------+------------+-----------+-----------------+----------------+--------------------------+--------------+-----------------------+----------------------+------------------+-----------+
|1          |62 |Other |45.15        |3                 |true        |true        |5892.58    |5                |22              |453.8                     |2             |0                      |129                   |Responded         |2025-12-2

## Trim string columns (excluding dates)

In [0]:
for column, dtype in df.dtypes:
    if dtype == "string" and not column.lower().endswith("date"):
        df = df.withColumn(column, trim(col(column)))


## Standardize column names 

In [0]:
for col_name in df.columns:
    clean_col = col_name.strip().replace(" ", "_").replace(".", "").replace("-", "_")
    df = df.withColumnRenamed(col_name, clean_col)


## Convert all column names to lowercase

In [0]:
df = df.select([col(c).alias(c.lower()) for c in df.columns])

In [0]:
from pyspark.sql.functions import current_timestamp

df = df.withColumn(
    "ingest_timestamp",
    current_timestamp()
)


In [0]:
from pyspark.sql.functions import current_date
df=df.withColumn("ingest_date", current_date())


## Converting Last Purchased Days into Purchase Date

In [0]:
from pyspark.sql.functions import col, date_sub, current_date

silver_df = df.withColumn(
    "Last_Purchase_Days_Ago",
    col("Last_Purchase_Days_Ago").cast("int")
)

silver_df = silver_df.withColumn(
    "Last_Purchase_Date",
    date_sub(current_date(), col("Last_Purchase_Days_Ago"))
)


## Display cleaned data

In [0]:
display(silver_df)

customer_id,age,gender,annual_income,satisfaction_score,email_opt_in,target_churn,total_spend,years_as_customer,num_of_purchases,average_transaction_amount,num_of_returns,num_of_support_contacts,Last_Purchase_Days_Ago,promotion_response,ingest_date,ingest_timestamp,Last_Purchase_Date
1,62,Other,45.15,3,True,True,5892.58,5,22,453.8,2,0,129,Responded,2025-12-26,2025-12-26T13:17:23.398199Z,2025-08-19
2,65,Male,79.51,3,False,False,9025.47,13,77,22.9,2,2,227,Responded,2025-12-26,2025-12-26T13:17:23.398199Z,2025-05-13
3,18,Male,29.19,2,False,True,618.83,13,71,50.53,5,2,283,Responded,2025-12-26,2025-12-26T13:17:23.398199Z,2025-03-18
4,21,Other,79.63,5,True,True,9110.3,3,33,411.83,5,3,226,Ignored,2025-12-26,2025-12-26T13:17:23.398199Z,2025-05-14
5,21,Other,77.66,5,False,False,5390.88,15,43,101.19,3,0,242,Unsubscribed,2025-12-26,2025-12-26T13:17:23.398199Z,2025-04-28
6,57,Male,190.43,4,False,False,255.19,19,85,417.78,5,1,130,Unsubscribed,2025-12-26,2025-12-26T13:17:23.398199Z,2025-08-18
7,27,Male,172.13,1,True,False,3512.55,3,77,316.18,0,3,61,Unsubscribed,2025-12-26,2025-12-26T13:17:23.398199Z,2025-10-26
8,37,Other,88.9,3,False,False,7270.9,4,87,63.61,3,4,224,Ignored,2025-12-26,2025-12-26T13:17:23.398199Z,2025-05-16
9,39,Other,24.46,4,True,True,4935.49,1,21,173.8,9,2,126,Responded,2025-12-26,2025-12-26T13:17:23.398199Z,2025-08-22
10,68,Other,169.59,2,True,False,9803.57,17,34,481.18,6,1,171,Unsubscribed,2025-12-26,2025-12-26T13:17:23.398199Z,2025-07-08


## Create a schema

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS cleaned_data.silver_layer")


DataFrame[]

## Save cleaned data to a Silver table

In [0]:
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("ingest_timestamp") \
    .saveAsTable("cleaned_data.silver_layer.customer_activities")

print(" Silver table created successfully")


 Silver table created successfully
