In [0]:
df_accounts = spark.table("databrick_tutorial_1.bronze.accounts")

df_accounts.display()

In [0]:

df_customers = spark.table("databrick_tutorial_1.bronze.customers")

df_customers.display()

In [0]:

df_transactions = spark.table("databrick_tutorial_1.bronze.transactions")

df_transactions.display()

In [0]:
df_interactions = spark.table("databrick_tutorial_1.bronze.interactions")

df_interactions.display()

In [0]:
from pyspark.sql.functions import *

df_customers_silver = df_customers.withColumn("zip_code", col("zip_code").cast("string"))\
                                .withColumn("churn_flag", col("churn_flag").cast("boolean"))

df_customers_silver.printSchema()

df_customers_silver.display()
                               



In [0]:
df_accounts_silver = df_accounts.withColumn("delinquent", col("delinquent").cast("boolean"))


df_accounts_silver.printSchema()

df_accounts_silver.display()

In [0]:
total_count = df_accounts_silver.count()
delinquent_count = df_accounts_silver.filter(col("delinquent") == True).count()
print("Total rows:", total_count)
print("Delinquent rows:", delinquent_count)



Total rows: 2000464
Delinquent rows: 107528


In [0]:
df_transactions_silver= (
    df_transactions
    .withColumn("is_fraud", col("is_fraud").cast("boolean"))
    .withColumn("amount", col("amount").cast("decimal(18,2)"))

)

df_transactions_silver.printSchema()
df_transactions_silver.display()


In [0]:
df_customers_silver= df_customers_silver.withColumn("churn_flag", col("churn_flag").cast("boolean"))

df_customers_silver.printSchema()
df_customers_silver.display()

In [0]:

df_interactions_silver = df_interactions
   
        
df_interactions_silver.display()


## DATA INTEGRITY CHECKS 

In [0]:
# check for orphan records in accounts_silver
df_accounts_orphan = (
    df_accounts_silver
        .join(df_customers_silver, on="customer_id", how="leftanti")
)

print("Orphan records in accounts_silver:", df_accounts_orphan.count())
df_accounts_orphan.display()
# check for orphan records


Orphan records in accounts_silver: 0


customer_id,account_id,account_type,open_date,balance,delinquent


In [0]:
# check for orphan records in transactions_silver 
df_transactions_orphan = (
    df_transactions_silver
        .join(df_customers_silver, on="customer_id", how="leftanti")
)

print("Orphan records in transactions_silver:", df_transactions_orphan.count())
df_transactions_orphan.display()

Orphan records in transactions_silver: 0


customer_id,account_id,txn_seq,trans_id,amount,type,timestamp,merchant,is_fraud


In [0]:
# check for orphan records in interactions_silver 
df_transactions_orphan = (
    df_transactions_silver
        .join(df_customers_silver, on="customer_id", how="leftanti")
)

print("Orphan records in transactions_silver:", df_transactions_orphan.count())
df_transactions_orphan.display()

Orphan records in transactions_silver: 0


customer_id,account_id,txn_seq,trans_id,amount,type,timestamp,merchant,is_fraud


In [0]:
# checking null values

def null_profile(df):
    # building a list of expressions, one per column
    exprs = []

    for col_name in df.columns:
        null_count_expr = sum(
            col(col_name).isNull().cast("int")
        ).alias(col_name)

        exprs.append(null_count_expr)

    # Select all those null-count expressions as one row
    return df.select(exprs)

null_profile(df_customers_silver).display()     
null_profile(df_accounts_silver).display()     
null_profile(df_transactions_silver).display() 
null_profile(df_interactions_silver).display()


customer_id,first_name,last_name,age,gender,income,segment,street_address,city,state,zip_code,country,region,tenure_months,churn_flag
0,0,0,0,0,200,0,200,200,0,200,0,0,200,0


account_id,customer_id,account_type,open_date,balance,delinquent
0,0,200,200,200,200


account_id,customer_id,txn_seq,trans_id,amount,type,timestamp,merchant,is_fraud
0,0,0,0,200,200,200,200,0


customer_id,support_calls,complaints,mobile_logins,branch_visits,email_open_rate
0,200,200,200,200,200


In [0]:
# # creating a function to drop null values

# def drop_null(df):
#     return df.dropna(how="any")

# df_customers_silver = drop_null(df_customers_silver)
# df_accounts_silver = drop_null(df_accounts_silver)
# df_transactions_silver = drop_null(df_transactions_silver)
# df_interactions_silver = drop_null(df_interactions_silver)



In [0]:
 #check for duplicates in customer_id

from pyspark.sql.functions import *


total_rows_customers = df_customers_silver.count()
distinct_customer_ids = df_customers.select("customer_id").distinct().count()

print("Total rows:               ", total_rows_customers)
print("Distinct customer_id:     ", distinct_customer_ids)
print("Duplicate customer_id(s): ", total_rows_customers - distinct_customer_ids)

df_customers_duplicates = (
    df_customers_silver
        .groupBy("customer_id")
        .count()
        .filter(col("count") > 1)
)
df_customers_duplicates.display()


Total rows:                1000000
Distinct customer_id:      1000000
Duplicate customer_id(s):  0


customer_id,count


In [0]:
# checking for duplicates in account_id
total_rows_accounts = df_accounts_silver.count()
distinct_account_ids = df_accounts_silver.select("account_id").distinct().count()


print("Total rows:              ", total_rows_accounts)
print("Distinct account_id:     ", distinct_account_ids)
print("Duplicate account_id(s): ", total_rows_accounts - distinct_account_ids)

df_accounts_duplicates = (
    df_accounts_silver
        .groupBy("account_id")
        .count()
        .filter(col("count") > 1)
)

df_accounts_duplicates.show(20)

Total rows:               2000464
Distinct account_id:      2000464
Duplicate account_id(s):  0
+----------+-----+
|account_id|count|
+----------+-----+
+----------+-----+



In [0]:
# checking for duplicates in transaction_id
total_rows_transactions = df_transactions_silver.count()
distinct_trans_ids = df_transactions_silver.select("trans_id").distinct().count()

print("Total rows:            ", total_rows_transactions)
print("Distinct trans_id:     ", distinct_trans_ids)
print("Duplicate trans_id(s): ", total_rows_transactions - distinct_trans_ids)

df_transactions_duplicates = (
    df_transactions_silver
        .groupBy("trans_id")
        .count()
        .filter(col("count") > 1)
)

df_transactions_duplicates.show(20)

Total rows:             35002391
Distinct trans_id:      35002391
Duplicate trans_id(s):  0
+--------+-----+
|trans_id|count|
+--------+-----+
+--------+-----+



In [0]:
# checking for duplicates in interaction_id
total_rows_interactions = df_interactions_silver.count()
distinct_interaction_customer_ids = df_interactions_silver.select("customer_id").distinct().count()

print("Total rows:               ", total_rows_interactions)
print("Distinct customer_id:     ", distinct_interaction_customer_ids)
print("Duplicate customer_id(s): ", total_rows_interactions - distinct_interaction_customer_ids)

df_interactions_duplicates = (
    df_interactions
        .groupBy("customer_id")
        .count()
        .filter(col("count") > 1)
)

df_interactions_duplicates.display()

Total rows:                1000000
Distinct customer_id:      1000000
Duplicate customer_id(s):  0


customer_id,count


In [0]:
from pyspark.sql.functions import col

df_customers_silver.groupBy("churn_flag").count().show()


+----------+------+
|churn_flag| count|
+----------+------+
|      true|236320|
|     false|763680|
+----------+------+



In [0]:
df_customers_silver.select("age").orderBy(col("age").asc()).limit(10).display()
df_customers_silver.select("age").orderBy(col("age").desc()).limit(10).display()



age
18
18
18
18
18
18
18
18
18
18


age
85
85
85
85
85
85
85
85
85
85


In [0]:
df_customers_silver.select("income").orderBy(col("income").asc()).show(5)
df_customers_silver.select("income").orderBy(col("income").desc()).show(5)

+------+
|income|
+------+
|  NULL|
|  NULL|
|  NULL|
|  NULL|
|  NULL|
+------+
only showing top 5 rows
+------+
|income|
+------+
|250000|
|249998|
|249998|
|249996|
|249995|
+------+
only showing top 5 rows


In [0]:
df_customers_silver.groupBy("segment").count().show()
df_customers_silver.groupBy("region").count().show()


+------------+------+
|     segment| count|
+------------+------+
|     student|332992|
|     retiree|333902|
|professional|333106|
+------------+------+

+---------+------+
|   region| count|
+---------+------+
|     West|259400|
|  Midwest|240008|
|Northeast|179895|
|    South|320697|
+---------+------+



## Business Logic Columns

### Customers

In [0]:
from pyspark.sql.functions import when, col
#early leavers, tenure, age, and  income groups
df_customers_silver = df_customers_silver.withColumn(
    "early_leaver",
    ((col("tenure_months") >= 0) & (col("tenure_months") <= 3)) & (col("churn_flag") == True)
).withColumn(
    "tenure_category",
    when((col("tenure_months") >= 0) & (col("tenure_months") <= 3), "0-3")
    .when((col("tenure_months") > 3) & (col("tenure_months") <= 6), "4-6")
    .when((col("tenure_months") > 6) & (col("tenure_months") <= 12), "7-12")
    .otherwise("13+")
).withColumn(
    "age_group",
    when(col("age") < 25, "18-24")
    .when((col("age") >= 25) & (col("age") <= 34), "25-34")
    .when((col("age") >= 35) & (col("age") <= 44), "35-44")
    .when((col("age") >= 45) & (col("age") <= 54), "45-54")
    .otherwise("55+")
).withColumn(
    "income_category",
    when(col("income") < 50_000, "Low")
    .when((col("income") >= 50_000) & (col("income") <= 150_000), "Medium")
    .otherwise("High")
)



### Accounts

In [0]:
df_accounts_silver = df_accounts_silver.withColumn(
    "high_balance",
    ((col("account_type").isin("savings", "chequing")) & (col("balance") > 50_000)).cast("int")
).withColumn(
    "recently_opened",
    (col("open_date") >= '2025-08-21').cast("int")  # last 3 months
)


### Transactions

In [0]:
df_transactions_silver = (
    df_transactions_silver
        .withColumn("timestamp", col("timestamp").cast("timestamp"))
        .withColumn("large_txn", (col("amount") > 10_000).cast("int"))
        .withColumn("date", to_date(col("timestamp")))
        .withColumn("year", year(col("timestamp")))
        .withColumn("month", month(col("timestamp")))
        .withColumn("day", dayofmonth(col("timestamp")))
        .withColumn("week", weekofyear(col("timestamp")))
        .withColumn("day_of_week", dayofweek(col("timestamp")))
        .withColumn("day_name", date_format(col("timestamp"), "EEEE"))
        .withColumn("month_name", date_format(col("timestamp"), "MMM"))
)

### Interactions

In [0]:
df_interactions_silver = df_interactions_silver.withColumn(
    "many_support_calls", (col("support_calls") > 5).cast("int")
).withColumn(
    "many_complaints", (col("complaints") > 2).cast("int")
).withColumn(
    "high_mobile_logins", (col("mobile_logins") > 20).cast("int")
).withColumn(
    "high_email_open", (col("email_open_rate") > 50).cast("int")
).withColumn(
    "multi_channel_user",
    ((col("high_mobile_logins") == 1) & (col("high_email_open") == 1) & (col("branch_visits") > 2)).cast("int")
)

In [0]:
# df_customers_silver.limit(10).display()
# df_accounts_silver.limit(10).display()
df_transactions_silver.display()
# df_interactions_silver.limit(10).display()

In [0]:
def save_to_delta(df, table_name):
    df.write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable(f"databrick_tutorial_1.silver.{table_name}")

save_to_delta(df_accounts_silver, "accounts_silver")
save_to_delta(df_customers_silver, "customers_silver")
save_to_delta(df_transactions_silver, "transactions_silver")
save_to_delta(df_interactions_silver, "interactions_silver")