In [0]:
%sql

select count(*) from bronze.txn_tbl

count(1)
10000


#####Reading source data from bronze layer

In [0]:
txn_df = spark.read.table("bronze.txn_tbl")
cust_df = spark.read.table("bronze.cust_tbl")
branch_df = spark.read.table("bronze.branches_tbl")

#Cleansed and enriched data, 

#####Function to drop duplicates from each source dataframe

In [0]:
def remove_duplicates(input_df):

    deduplicated_df = input_df.dropDuplicates()
    return deduplicated_df

#####Function to remove all specified columns which can't be null

In [0]:
from functools import reduce
def remove_null_rows(input_df, columns_to_check):
    from pyspark.sql.functions import col
    
    # Filter rows where any specified column has null value
    condition = ~reduce(lambda x, y: x | y, (col(column).isNull() for column in columns_to_check))
    non_null_df = input_df.filter(condition)
    
    return non_null_df

#####Function to check valid customer

In [0]:
from pyspark.sql import functions as F

def check_customer_id_length(input_df, length):
    result_df = input_df.withColumn(
        "customer_id_length_valid",
        F.when(F.length(F.col("customer_id")) == length, "valid").otherwise("invalid"),
    )
    result_df = result_df.filter(col("customer_id_length_valid")=="valid")
    return result_df
  

##Strating to cleanse data 

###1.Removing duplicates

In [0]:
txn_deduplicated_df = remove_duplicates(txn_df)
cust_deduplicated_df = remove_duplicates(cust_df)
branch_deduplicated_df = remove_duplicates(branch_df)

# Count the number of rows in each DataFrame
txn_count = txn_deduplicated_df.count()
cust_count = cust_deduplicated_df.count()
branch_count = branch_deduplicated_df.count()

# Print the counts
print(f"Transaction DataFrame Count: {txn_count}")
print(f"Customer DataFrame Count: {cust_count}")
print(f"Branch DataFrame Count: {branch_count}")


Transaction DataFrame Count: 5000
Customer DataFrame Count: 1000
Branch DataFrame Count: 10


##2. Dropping data if null value are present in mandatory cols 

In [0]:
# List of columns to check for null values
columns_to_check = ["transaction_id", "customer_id"]

# Remove columns with only null values based on specified columns
txn_non_null_df = remove_null_rows(txn_deduplicated_df, columns_to_check)
txn_non_null_df.count()

Out[37]: 5000

##3. Check on valid customer

In [0]:
from pyspark.sql.functions import col

# Specify the expected length of customer_id
expected_length = 5

# Apply the function to check customer_id length
verified_txn_df = check_customer_id_length(txn_non_null_df, expected_length)
verified_cust_df = check_customer_id_length(cust_deduplicated_df, expected_length)

# Count the number of rows in each DataFrame
txn_count = verified_txn_df.count()
cust_count = verified_cust_df.count()

# Print the counts
print(f"Transaction DataFrame Count: {txn_count}")
print(f"Customer DataFrame Count: {cust_count}")

Transaction DataFrame Count: 5000
Customer DataFrame Count: 1000


In [0]:
joined_df = txn_df.join(cust_df, on=['customer_id', 'customer_id'], how='inner').join(branch_df, on=['branch_id', 'branch_id'], how='inner')

In [0]:
joined_df.show()

+---------+---------+-----------+-----------+--------------+-------+----------------+--------+--------+-------------------+---------+-------------------+--------------------+--------------+--------------------+------------+----------+-------------------+--------------+------------------+--------+
|branch_id|branch_id|customer_id|customer_id|transaction_id|channel|transaction_type|  amount|currency|          timestamp|   status|               name|               email|         phone|             address|credit_score| join_date|        last_update|          name|          location|timezone|
+---------+---------+-----------+-----------+--------------+-------+----------------+--------+--------+-------------------+---------+-------------------+--------------------+--------------+--------------------+------------+----------+-------------------+--------------+------------------+--------+
|    B0008|    B0008|      C1132|      C1132|         T8000| mobile|         deposit|   85.67|     EUR|202

In [0]:
joined_df = txn_df.alias('t') \
    .join(cust_df.alias('c'), col('t.customer_id') == col('c.customer_id'), 'inner') \
    .join(branch_df.alias('b'), col('t.branch_id') == col('b.branch_id'), 'inner') \
    .select(
        col('t.transaction_id'),
        col('t.channel'),
        col('t.transaction_type'),
        col('t.amount'),
        col('t.currency'),
        col('t.timestamp').alias('transaction_timestamp'),
        col('t.status').alias('transaction_status'),
        col('t.customer_id'),
        col('c.name').alias('customer_name'), # renamed column
        col('c.email'),
        col('c.phone'),
        col('c.address'),
        col('c.credit_score'),
        col('c.join_date'),
        col('c.last_update'),
        col('t.branch_id'),
        col('b.name').alias('branch_name'), # renamed column
        col('b.location').alias('branch_location'), # renamed column
        col('b.timezone').alias('branch_timezone') # renamed column
    )

In [0]:
 joined_df.write.format("delta").mode("overwrite").saveAsTable("silver.merge_table")

In [0]:
joined_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_timestamp: timestamp (nullable = true)
 |-- transaction_status: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- credit_score: long (nullable = true)
 |-- join_date: string (nullable = true)
 |-- last_update: string (nullable = true)
 |-- branch_id: string (nullable = true)
 |-- branch_name: string (nullable = true)
 |-- branch_location: string (nullable = true)
 |-- branch_timezone: string (nullable = true)



In [0]:
flag_source_df = joined_df.select("transaction_id","amount","channel","timestamp","currency","address")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import regexp_extract


# Regular expression pattern to extract 5-digit ZIP code at the end
zip_code_pattern = r'(\d{5})$'

# Add a new column 'zip_code' by applying the regular expression pattern
df_with_zip = flag_source_df.withColumn("zip_code", regexp_extract("address", zip_code_pattern, 1))

zip_codes_list = df_with_zip.select("zip_code").rdd.flatMap(lambda x: x).collect()


# Original list of zip codes
zip_codes_list = [f'{i:05d}' for i in range(1000)]  # Example list of zip codes

# Create a new list with the first 950 items 
new_zip_codes_list = zip_codes_list[:950]


# Function to check if the zip code exists in the predefined list
def check_zip_code(address):
    zip_code = address[-5:]  # Extract last 5 digits
    return zip_code in new_zip_codes_list

# Register the UDF with Spark
check_zip_code_udf = F.udf(check_zip_code, BooleanType())

In [0]:
from pyspark.sql import functions as F


flag_df = flag_source_df.select(
    F.col('transaction_id'),
    F.when(F.col('amount') > 90000, "unusual_amount")
     .when(F.col('currency').isin(['Cryptocurrency']), "watchlist_match")
     .when(check_zip_code_udf(F.col('address')), "new_geolocation")
     .when(F.col('amount') < 2, "pattern_anomaly")
     .otherwise("normal")  
     .alias('flag_type'),
    F.current_timestamp().alias('timestamp')
)

flag_type_df = flag_df.withColumn(
    'confidence_score',
    F.when(F.col('flag_type') == 'unusual_amount', 0.75)
     .when(F.col('flag_type') == 'watchlist_match', 0.95)
     .when(F.col('flag_type') == 'new_geolocation', 0.80)
     .when(F.col('flag_type') == 'pattern_anomaly', 0.70)          
     .otherwise(0.5)  # Default confidence score for other cases
)

flagged_df = flag_type_df.filter(F.col("flag_type") != "normal")


flag_id_df = flagged_df.withColumn(
    'flag_id',
    F.concat(F.lit("F000"), F.monotonically_increasing_id())
)


# Reorder the columns
ordered_columns = ['flag_id', 'transaction_id', 'flag_type', 'timestamp', 'confidence_score']
df_ordered = flag_id_df.select(ordered_columns)

# Write the DataFrame to a Delta table
df_ordered.write.format("delta").mode("overwrite").saveAsTable("silver.fraud_flag")


In [0]:
%sql
select distinct flag_type,count(*) from silver.fraud_flag group by flag_type limit 10;

flag_type,count(1)
new_geolocation,58
pattern_anomaly,252
unusual_amount,42


In [0]:
%sql
select * from silver.fraud_flag limit 10;

flag_id,transaction_id,flag_type,timestamp,confidence_score
F0000,T8107,pattern_anomaly,2024-07-31T14:01:27.802+0000,0.7
F0001,T8119,pattern_anomaly,2024-07-31T14:01:27.802+0000,0.7
F0002,T8130,pattern_anomaly,2024-07-31T14:01:27.802+0000,0.7
F0003,T8219,pattern_anomaly,2024-07-31T14:01:27.802+0000,0.7
F0004,T8225,pattern_anomaly,2024-07-31T14:01:27.802+0000,0.7
F0005,T8293,pattern_anomaly,2024-07-31T14:01:27.802+0000,0.7
F0006,T8313,pattern_anomaly,2024-07-31T14:01:27.802+0000,0.7
F0007,T8318,unusual_amount,2024-07-31T14:01:27.802+0000,0.75
F0008,T8353,new_geolocation,2024-07-31T14:01:27.802+0000,0.8
F0009,T8402,unusual_amount,2024-07-31T14:01:27.802+0000,0.75


####Customer segment table

In [0]:
def filter_recent_customers(join_date_col):
    # Calculate the date one month ago
    one_month_ago = F.date_sub(F.current_date(), 30)
    
    # Return a boolean condition for recent customers
    return F.col(join_date_col) >= one_month_ago

  

In [0]:
def filter_old_customers(join_date_col):
    # Calculate the date 90 days ago
    ninety_days_ago = F.date_sub(F.current_date(), 90)
    
    # Return a boolean condition for customers older than 90 days
    return F.col(join_date_col) <= ninety_days_ago

In [0]:
from pyspark.sql.functions import *
 
# joined_df = joined_df.withColumn("year_diff", months_between(col("timestamp"), col("join_date")) / 12)


joined_yr_df = joined_df.withColumn("years_diff", round(datediff(col("timestamp").cast("timestamp"), to_date(col("join_date"), "yyyy-MM-dd")) / lit(365.25)))




In [0]:
joined_yr_df.display(10)

branch_id,branch_id.1,customer_id,customer_id.1,transaction_id,channel,transaction_type,amount,currency,timestamp,status,name,email,phone,address,credit_score,join_date,last_update,name.1,location,timezone,years_diff
B0008,B0008,C1132,C1132,T8000,mobile,deposit,85.67,EUR,2025-01-31T12:54:00.000+0000,pending,Rebecca Manning,rebecca.manning@aol.com,(653) 635-5660,"Unit 2963 Box 1023, DPO AE 75681",761,2019-10-05,2024-07-28 00:00:00,Central Branch,West Sandrachester,AEST,5.0
B0005,B0005,C1979,C1979,T8001,mobile,payment,22.78,USD,2025-01-31T15:04:00.000+0000,pending,Nicole Owens,nicole.owens@outlook.com,(653) 347-8022,"12603 Bird Keys Suite 438, Lake Brittany, NM 18496",830,2024-06-16,2024-07-26 00:00:00,East Branch,Hillside,PST,1.0
B0007,B0007,C1997,C1997,T8002,mobile,transfer,6.65,GBP,2025-01-31T18:03:00.000+0000,completed,Timothy Hill,timothy.hill@gmail.com,(285) 968-6159,"71758 Rachel Mill, Port Kevin, ID 18244",754,2024-07-16,2024-07-21 00:00:00,West Branch,East Melindahaven,AEST,1.0
B0008,B0008,C1295,C1295,T8003,mobile,withdrawal,3.35,EUR,2025-02-01T03:11:00.000+0000,completed,Kelsey Parker,kelsey.parker@outlook.com,(001) 783-9124,"9984 Bates Center, Amberhaven, TX 29506",555,2020-07-17,2024-07-21 00:00:00,Central Branch,West Sandrachester,AEST,5.0
B0003,B0003,C1791,C1791,T8004,branch,withdrawal,83.96,GBP,2025-02-01T08:22:00.000+0000,completed,Michael Levine,michael.levine@aol.com,(125) 345-3823,"85128 Gina Forest, Michaelton, AR 63167",825,2023-05-07,2024-07-29 00:00:00,Central Branch,East Crystalberg,GMT,2.0
B0002,B0002,C1957,C1957,T8005,branch,payment,108.42,EUR,2025-02-01T10:44:00.000+0000,completed,Jay Sanchez,jay.sanchez@aol.com,(724) 700-5663,"2475 Thompson Viaduct Apt. 129, Burtonfurt, ME 83398",711,2024-04-09,2024-07-29 00:00:00,North Branch,Johnsonbury,PST,1.0
B0008,B0008,C1234,C1234,T8006,mobile,payment,17202.33,GBP,2025-02-01T21:34:00.000+0000,completed,Dr. Kristina Foley,dr..foley@yahoo.com,(316) 636-1829,"9620 Carlson Villages, Lisaview, VA 04046",650,2020-03-18,2024-07-24 00:00:00,Central Branch,West Sandrachester,AEST,5.0
B0008,B0008,C1400,C1400,T8007,mobile,payment,64.11,EUR,2025-02-02T06:43:00.000+0000,completed,Amanda Mercado,amanda.mercado@hotmail.com,(591) 997-7914,"160 John Alley Apt. 761, East Aliciafort, WI 27481",770,2021-01-29,2024-07-28 00:00:00,Central Branch,West Sandrachester,AEST,4.0
B0000,B0000,C1594,C1594,T8008,web,withdrawal,6.98,GBP,2025-02-02T18:42:00.000+0000,pending,Linda Mclaughlin,linda.mclaughlin@hotmail.com,(247) 305-5434,"711 Daniel Pines Suite 471, Williamtown, FM 58477",780,2022-03-03,2024-07-31 00:00:00,West Branch,Hillside,AEST,3.0
B0009,B0009,C1714,C1714,T8009,ATM,deposit,11.29,EUR,2025-02-02T22:00:00.000+0000,completed,Kayla York,kayla.york@gmail.com,(846) 916-6355,"1121 Victor Rapid Apt. 978, East Allison, HI 82055",587,2022-11-24,2024-07-20 00:00:00,West Branch,Jessicaborough,PST,2.0


In [0]:

from pyspark.sql.functions import col,when

cust_seg_df = joined_yr_df.select(
    F.col('customer_id'),
    F.col('join_date'),
    F.when(filter_recent_customers('join_date'), "New_User")
     .when(filter_old_customers('last_update'), "Inactive")
     .when(F.col('amount') > 90000, "High_Value")
     .when(F.col('years_diff') > 5, "Loyal")
     .when(F.col('credit_score') < 550, "Credit_Risk")
     .otherwise("normal")  
     .alias('segment_name'),
    F.current_timestamp().alias('last_updated_date')
)

cust_seg_id_df = cust_seg_df.filter(F.col("segment_name") != "normal")


cust_seg_final_df = (
    cust_seg_id_df
    .withColumn(
        'segment_description',
        when(col("segment_name") == "High_Value", "Customers with high transaction volume")
        .when(col("segment_name") == "New_User", "Customers who joined in last 30 days")
        .when(col("segment_name") == "Inactive", "No transactions in last 90 days")
        .when(col("segment_name") == "Credit_Risk", "Customers with low credit scores")
        .when(col("segment_name") == "Loyal", "Consistent activity for over 5 years")
        .otherwise("Unknown")
    )
)

# Reorder the columns
ordered_columns = [ 'customer_id', 'segment_name', 'segment_description', 'last_updated_date']
df_ordered = cust_seg_final_df.select(ordered_columns)

# Write the DataFrame to a Delta table
df_ordered.write.format("delta").mode("overwrite").saveAsTable("silver.customer_segments")


In [0]:
%sql
select distinct segment_name,count(*) from silver.customer_segments group by segment_name ;

segment_name,count(1)
High_Value,42
Loyal,1130
Credit_Risk,1070
New_User,132
