In [0]:
%sql

select count(*) from bronze.txn_tbl

count(1)
10000


#####Reading source data from bronze layer

In [0]:
txn_df = spark.read.table("bronze.txn_tbl")
cust_df = spark.read.table("bronze.cust_tbl")
branch_df = spark.read.table("bronze.branches_tbl")

#Cleansed and enriched data, 

#####Function to drop duplicates from each source dataframe

In [0]:
def remove_duplicates(input_df):

    deduplicated_df = input_df.dropDuplicates()
    return deduplicated_df

#####Function to remove all specified columns which can't be null

In [0]:
from functools import reduce
def remove_null_rows(input_df, columns_to_check):
    from pyspark.sql.functions import col
    
    # Filter rows where any specified column has null value
    condition = ~reduce(lambda x, y: x | y, (col(column).isNull() for column in columns_to_check))
    non_null_df = input_df.filter(condition)
    
    return non_null_df

#####Function to check valid customer

In [0]:
from pyspark.sql import functions as F

def check_customer_id_length(input_df, length):
    result_df = input_df.withColumn(
        "customer_id_length_valid",
        F.when(F.length(F.col("customer_id")) == length, "valid").otherwise("invalid"),
    )
    result_df = result_df.filter(col("customer_id_length_valid")=="valid")
    return result_df
  

##Strating to cleanse data 

###1.Removing duplicates

In [0]:
txn_deduplicated_df = remove_duplicates(txn_df)
cust_deduplicated_df = remove_duplicates(cust_df)
branch_deduplicated_df = remove_duplicates(branch_df)

# Count the number of rows in each DataFrame
txn_count = txn_deduplicated_df.count()
cust_count = cust_deduplicated_df.count()
branch_count = branch_deduplicated_df.count()

# Print the counts
print(f"Transaction DataFrame Count: {txn_count}")
print(f"Customer DataFrame Count: {cust_count}")
print(f"Branch DataFrame Count: {branch_count}")


Transaction DataFrame Count: 5000
Customer DataFrame Count: 1000
Branch DataFrame Count: 10


##2. Dropping data if null value are present in mandatory cols 

In [0]:
# List of columns to check for null values
columns_to_check = ["transaction_id", "customer_id"]

# Remove columns with only null values based on specified columns
txn_non_null_df = remove_null_rows(txn_deduplicated_df, columns_to_check)
txn_non_null_df.count()

Out[7]: 5000

##3. Check on valid customer

In [0]:
from pyspark.sql.functions import col

# Specify the expected length of customer_id
expected_length = 5

# Apply the function to check customer_id length
verified_txn_df = check_customer_id_length(txn_non_null_df, expected_length)
verified_cust_df = check_customer_id_length(cust_deduplicated_df, expected_length)

# Count the number of rows in each DataFrame
txn_count = verified_txn_df.count()
cust_count = verified_cust_df.count()

# Print the counts
print(f"Transaction DataFrame Count: {txn_count}")
print(f"Customer DataFrame Count: {cust_count}")

Transaction DataFrame Count: 5000
Customer DataFrame Count: 1000


###join three table using inner join

In [0]:
joined_df = txn_df.join(cust_df, on=['customer_id', 'customer_id'], how='inner').join(branch_df, on=['branch_id', 'branch_id'], how='inner')

In [0]:
joined_df.show()

+---------+---------+-----------+-----------+--------------+-------+----------------+--------+--------+-------------------+---------+-----------------+--------------------+--------------+--------------------+------------+----------+-------------------+---------------+------------------+--------+
|branch_id|branch_id|customer_id|customer_id|transaction_id|channel|transaction_type|  amount|currency|          timestamp|   status|             name|               email|         phone|             address|credit_score| join_date|        last_update|           name|          location|timezone|
+---------+---------+-----------+-----------+--------------+-------+----------------+--------+--------+-------------------+---------+-----------------+--------------------+--------------+--------------------+------------+----------+-------------------+---------------+------------------+--------+
|    B0006|    B0006|      C1000|      C1000|         T5235|    ATM|         payment|   26.81|     GBP|2023-0

In [0]:
joined_df = txn_df.alias('t') \
    .join(cust_df.alias('c'), col('t.customer_id') == col('c.customer_id'), 'inner') \
    .join(branch_df.alias('b'), col('t.branch_id') == col('b.branch_id'), 'inner') \
    .select(
        col('t.transaction_id'),
        col('t.channel'),
        col('t.transaction_type'),
        col('t.amount'),
        col('t.currency'),
        col('t.timestamp').alias('transaction_timestamp'),
        col('t.status').alias('transaction_status'),
        col('t.customer_id'),
        col('c.name').alias('customer_name'), # renamed column
        col('c.email'),
        col('c.phone'),
        col('c.address'),
        col('c.credit_score'),
        col('c.join_date'),
        col('c.last_update'),
        col('t.branch_id'),
        col('b.name').alias('branch_name'), # renamed column
        col('b.location').alias('branch_location'), # renamed column
        col('b.timezone').alias('branch_timezone') # renamed column
    )

###write merge table into silver

In [0]:
joined_df.write.format("delta").mode("overwrite").saveAsTable("silver.merge_table")

In [0]:
joined_df.printSchema()

root
 |-- branch_id: string (nullable = true)
 |-- branch_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- status: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- credit_score: long (nullable = true)
 |-- join_date: string (nullable = true)
 |-- last_update: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- timezone: string (nullable = true)



In [0]:
flag_source_df = joined_df.select("transaction_id","amount","channel","timestamp","currency","address")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import regexp_extract


# Regular expression pattern to extract 5-digit ZIP code at the end
zip_code_pattern = r'(\d{5})$'

# Add a new column 'zip_code' by applying the regular expression pattern
df_with_zip = flag_source_df.withColumn("zip_code", regexp_extract("address", zip_code_pattern, 1))

zip_codes_list = df_with_zip.select("zip_code").rdd.flatMap(lambda x: x).collect()


# Original list of zip codes
zip_codes_list = [f'{i:05d}' for i in range(1000)]  # Example list of zip codes

# Create a new list with the first 950 items 
new_zip_codes_list = zip_codes_list[:950]


# Function to check if the zip code exists in the predefined list
def check_zip_code(address):
    zip_code = address[-5:]  # Extract last 5 digits
    return zip_code in new_zip_codes_list

# Register the UDF with Spark
check_zip_code_udf = F.udf(check_zip_code, BooleanType())

##Fraud Flag Table

In [0]:
from pyspark.sql import functions as F


flag_df = flag_source_df.select(
    F.col('transaction_id'),
    F.when(F.col('amount') > 90000, "unusual_amount")
     .when(F.col('currency').isin(['Cryptocurrency']), "watchlist_match")
     .when(check_zip_code_udf(F.col('address')), "new_geolocation")
     .when(F.col('amount') < 2, "pattern_anomaly")
     .otherwise("normal")  
     .alias('flag_type'),
    F.current_timestamp().alias('timestamp')
)

flag_type_df = flag_df.withColumn(
    'confidence_score',
    F.when(F.col('flag_type') == 'unusual_amount', 0.75)
     .when(F.col('flag_type') == 'watchlist_match', 0.95)
     .when(F.col('flag_type') == 'new_geolocation', 0.80)
     .when(F.col('flag_type') == 'pattern_anomaly', 0.70)          
     .otherwise(0.5)  # Default confidence score for other cases
)

flagged_df = flag_type_df.filter(F.col("flag_type") != "normal")


flag_id_df = flagged_df.withColumn(
    'flag_id',
    F.concat(F.lit("F000"), F.monotonically_increasing_id())
)


# Reorder the columns
ordered_columns = ['flag_id', 'transaction_id', 'flag_type', 'timestamp', 'confidence_score']
df_ordered = flag_id_df.select(ordered_columns)

# Write the DataFrame to a Delta table
df_ordered.write.format("delta").mode("overwrite").saveAsTable("silver.fraud_flag")


In [0]:
%sql
select distinct flag_type,count(*) from silver.fraud_flag group by flag_type limit 10;

flag_type,count(1)
new_geolocation,78
pattern_anomaly,262
unusual_amount,62


In [0]:
%sql
select * from silver.fraud_flag limit 10;

flag_id,transaction_id,flag_type,timestamp,confidence_score
F0000,T8322,pattern_anomaly,2024-07-31T13:17:04.711+0000,0.7
F0001,T8322,pattern_anomaly,2024-07-31T13:17:04.711+0000,0.7
F0002,T5232,new_geolocation,2024-07-31T13:17:04.711+0000,0.8
F0003,T5232,new_geolocation,2024-07-31T13:17:04.711+0000,0.8
F0004,T7981,new_geolocation,2024-07-31T13:17:04.711+0000,0.8
F0005,T7981,new_geolocation,2024-07-31T13:17:04.711+0000,0.8
F0006,T6862,new_geolocation,2024-07-31T13:17:04.711+0000,0.8
F0007,T6205,new_geolocation,2024-07-31T13:17:04.711+0000,0.8
F0008,T6862,new_geolocation,2024-07-31T13:17:04.711+0000,0.8
F0009,T6205,new_geolocation,2024-07-31T13:17:04.711+0000,0.8


####Customer segment table

In [0]:
def filter_recent_customers(join_date_col):
    # Calculate the date one month ago
    one_month_ago = F.date_sub(F.current_date(), 30)
    
    # Return a boolean condition for recent customers
    return F.col(join_date_col) >= one_month_ago

  

In [0]:
def filter_old_customers(join_date_col):
    # Calculate the date 90 days ago
    ninety_days_ago = F.date_sub(F.current_date(), 90)
    
    # Return a boolean condition for customers older than 90 days
    return F.col(join_date_col) <= ninety_days_ago

In [0]:
from pyspark.sql.functions import *
 
# joined_df = joined_df.withColumn("year_diff", months_between(col("timestamp"), col("join_date")) / 12)


joined_yr_df = joined_df.withColumn("years_diff", round(datediff(col("timestamp").cast("timestamp"), to_date(col("join_date"), "yyyy-MM-dd")) / lit(365.25)))




In [0]:
joined_yr_df.display(10)

branch_id,branch_id.1,customer_id,customer_id.1,transaction_id,channel,transaction_type,amount,currency,timestamp,status,name,email,phone,address,credit_score,join_date,last_update,name.1,location,timezone,years_diff
B0006,B0006,C1000,C1000,T5235,ATM,payment,26.81,GBP,2023-02-26T23:11:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,Oliverstad,EST,4.0
B0006,B0006,C1000,C1000,T5235,ATM,payment,26.81,GBP,2023-02-26T23:11:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,Oliverstad,EST,4.0
B0002,B0002,C1000,C1000,T7486,ATM,payment,2.92,EUR,2024-09-08T16:11:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,East Jonathanburgh,EST,6.0
B0004,B0004,C1000,C1000,T7266,branch,withdrawal,61246.35,EUR,2024-07-15T08:28:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Central Branch,Port Martinton,EST,6.0
B0009,B0009,C1000,C1000,T7129,mobile,payment,14.67,EUR,2024-06-12T11:32:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,North Lisafort,AEST,5.0
B0002,B0002,C1000,C1000,T7486,ATM,payment,2.92,EUR,2024-09-08T16:11:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,East Jonathanburgh,EST,6.0
B0004,B0004,C1000,C1000,T7266,branch,withdrawal,61246.35,EUR,2024-07-15T08:28:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Central Branch,Port Martinton,EST,6.0
B0009,B0009,C1000,C1000,T7129,mobile,payment,14.67,EUR,2024-06-12T11:32:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,North Lisafort,AEST,5.0
B0006,B0006,C1000,C1000,T6591,branch,transfer,14.6,GBP,2024-01-28T05:13:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,Oliverstad,EST,5.0
B0006,B0006,C1000,C1000,T6591,branch,transfer,14.6,GBP,2024-01-28T05:13:00.000+0000,completed,Phillip Hernandez,phillip.hernandez@hotmail.com,(001) 861-4705,"95351 Dustin Manor Apt. 262, South Robertmouth, VA 05601",583,2019-01-01,2024-07-24 00:00:00,Downtown Branch,Oliverstad,EST,5.0


In [0]:

from pyspark.sql.functions import col,when

cust_seg_df = joined_yr_df.select(
    F.col('customer_id'),
    F.col('join_date'),
    F.when(filter_recent_customers('join_date'), "New_User")
     .when(filter_old_customers('last_update'), "Inactive")
     .when(F.col('amount') > 90000, "High_Value")
     .when(F.col('years_diff') > 5, "Loyal")
     .when(F.col('credit_score') < 550, "Credit_Risk")
     .otherwise("normal")  
     .alias('segment_name'),
    F.current_timestamp().alias('last_updated_date')
)

cust_seg_id_df = cust_seg_df.filter(F.col("segment_name") != "normal")


cust_seg_final_df = (
    cust_seg_id_df
    .withColumn(
        'segment_description',
        when(col("segment_name") == "High_Value", "Customers with high transaction volume")
        .when(col("segment_name") == "New_User", "Customers who joined in last 30 days")
        .when(col("segment_name") == "Inactive", "No transactions in last 90 days")
        .when(col("segment_name") == "Credit_Risk", "Customers with low credit scores")
        .when(col("segment_name") == "Loyal", "Consistent activity for over 5 years")
        .otherwise("Unknown")
    )
)

# Reorder the columns
ordered_columns = [ 'customer_id', 'segment_name', 'segment_description', 'last_updated_date']
df_ordered = cust_seg_final_df.select(ordered_columns)

# Write the DataFrame to a Delta table
df_ordered.write.format("delta").mode("overwrite").saveAsTable("silver.customer_segments")


In [0]:
%sql
select distinct segment_name,count(*) from silver.customer_segments group by segment_name ;

segment_name,count(1)
High_Value,62
Loyal,856
Credit_Risk,1246
New_User,62
