##Silver Layer

*sourcing config file* 

In [0]:
%run /capstone/config_script/setup_schema_and_config

In [0]:
%sql

select count(*) from bronze.txn_tbl

count(1)
158


Customer input path: dbfs:/FileStore/capstone/cust_tbl/
Branch input path: dbfs:/FileStore/capstone/branches_tbl/
Transaction input path: dbfs:/FileStore/capstone/txn_tbl/
Checkpoint location: dbfs:/FileStore/capstone/txn_tbl/checkpoint_location/
Columns for null check: ['transaction_id', 'customer_id']
Expected customer ID length: 5
Ordered fraud flag columns: ['flag_id', 'transaction_id', 'flag_type', 'timestamp', 'confidence_score']
Ordered customer segments columns: ['customer_id', 'segment_name', 'segment_description', 'last_updated_date']
bronze_tables['txn'] is: bronze.txn_tbl
bronze_tables['cust'] is: bronze.cust_tbl
bronze_tables['branch'] is: bronze.branches_tbl


##Function to be used in silver notebook

#####Function drop duplicates from each source dataframe

In [0]:
def remove_duplicates(input_df):

    deduplicated_df = input_df.dropDuplicates()
    return deduplicated_df

#####Function to remove all specified columns which can't be null

In [0]:
from functools import reduce
def remove_null_rows(input_df, columns_to_check):
    from pyspark.sql.functions import col
    
    # Filter rows where any specified column has null value
    condition = ~reduce(lambda x, y: x | y, (col(column).isNull() for column in columns_to_check))
    non_null_df = input_df.filter(condition)
    
    return non_null_df

#####Function to check valid customer

In [0]:
from pyspark.sql import functions as F

def check_customer_id_length(input_df, length):
    result_df = input_df.withColumn(
        "customer_id_length_valid",
        F.when(F.length(F.col("customer_id")) == length, "valid").otherwise("invalid"),
    )
    result_df = result_df.filter(col("customer_id_length_valid")=="valid")
    return result_df

#####Function for finding new user

In [0]:
def filter_recent_customers(join_date_col):
    # Calculate the date one month ago
    one_month_ago = F.date_sub(F.current_date(), 30)
    
    # Return a boolean condition for recent customers
    return F.col(join_date_col) >= one_month_ago


#####Function for finding invalid user

In [0]:
def filter_old_customers(join_date_col):
    # Calculate the date 90 days ago
    ninety_days_ago = F.date_sub(F.current_date(), 90)
    
    # Return a boolean condition for customers older than 90 days
    return F.col(join_date_col) <= ninety_days_ago

#####Function for new geolocation

In [0]:
# Function to check if the zip code exists in the predefined list
def check_zip_code(address):
    zip_code = address[-5:]  # Extract last 5 digits
    return zip_code in new_zip_codes_list

##Reading source data from bronze layer

In [0]:
# txn_df = spark.read.table(bronze_tables['txn'])
txn_df = spark.readStream.format("delta").table(bronze_tables['txn'])
cust_df = spark.read.table(bronze_tables['cust'])
branch_df = spark.read.table(bronze_tables['branch'])

#####Strating to cleanse data 

1.Removing duplicates

In [0]:
txn_deduplicated_df = remove_duplicates(txn_df)
cust_deduplicated_df = remove_duplicates(cust_df)
branch_deduplicated_df = remove_duplicates(branch_df)

# # Count the number of rows in each DataFrame
# txn_count = txn_deduplicated_df.count()
# cust_count = cust_deduplicated_df.count()
# branch_count = branch_deduplicated_df.count()

# # Print the counts
# print(f"Transaction DataFrame Count: {txn_count}")
# print(f"Customer DataFrame Count: {cust_count}")
# print(f"Branch DataFrame Count: {branch_count}")


2. Dropping data if null value are present in mandatory cols 

In [0]:
# Remove columns with null values based on specified columns
txn_non_null_df = remove_null_rows(txn_deduplicated_df, columns_for_null_check)
# txn_non_null_df.count()

3. Check on valid customer

In [0]:
from pyspark.sql.functions import col

# Apply the function to check customer_id length
verified_txn_df = check_customer_id_length(txn_non_null_df, expected_cust_id_length)
verified_cust_df = check_customer_id_length(cust_deduplicated_df, expected_cust_id_length)

# # Count the number of rows in each DataFrame
# txn_count = verified_txn_df.count()
# cust_count = verified_cust_df.count()

# # Print the counts
# print(f"Transaction DataFrame Count: {txn_count}")
# print(f"Customer DataFrame Count: {cust_count}")

####Joining transaction,customer and branch data to produce customer segment and get fraud data

In [0]:
joined_df = verified_txn_df.join(verified_cust_df, on=['customer_id', 'customer_id'], how='inner').join(branch_df, on=['branch_id', 'branch_id'], how='inner')

####Zip code check for new geolocation

In [0]:
flag_source_df = joined_df.select("transaction_id","amount","channel","timestamp","currency","address")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import regexp_extract


# Regular expression pattern to extract 5-digit ZIP code at the end
zip_code_pattern = r'(\d{5})$'

# Add a new column 'zip_code' by applying the regular expression pattern
df_with_zip = flag_source_df.withColumn("zip_code", regexp_extract("address", zip_code_pattern, 1))

# zip_codes_list = df_with_zip.select("zip_code").rdd.flatMap(lambda x: x).collect()


# # Original list of zip codes
# zip_codes_list = [f'{i:05d}' for i in range(1000)]  # Example list of zip codes 
# new_zip_codes_list = zip_codes_list[:955]

new_zip_codes_list = ['00000']
# Register the UDF with Spark
check_zip_code_udf = F.udf(check_zip_code, BooleanType())

##Fraud Flag table creation

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import uuid

flag_df = joined_df.select(
    F.col('transaction_id'),
    F.when(F.col('amount') > 90000, "unusual_amount")
     .when(F.col('currency').isin(['CRYPTO']), "watchlist_match")
     .when(check_zip_code_udf(F.col('address')), "new_geolocation")
     .when(F.col('amount') < 0, "pattern_anomaly")
     .otherwise("normal")  
     .alias('flag_type'),
    F.current_timestamp().alias('timestamp')
)

flag_type_df = flag_df.withColumn(
    'confidence_score',
    F.when(F.col('flag_type') == 'unusual_amount', 0.75)
     .when(F.col('flag_type') == 'watchlist_match', 0.95)
     .when(F.col('flag_type') == 'new_geolocation', 0.80)
     .when(F.col('flag_type') == 'pattern_anomaly', 0.70)          
     .otherwise(0.5)  # Default confidence score for other cases
)

flagged_df = flag_type_df.filter(F.col("flag_type") != "normal")

def generate_uuid():
    return str(uuid.uuid4())

uuid_udf = udf(generate_uuid, StringType())

# Add UUID column to DataFrame
flag_id_df = flagged_df.withColumn("flag_id", uuid_udf())

# flag_id_df = flagged_df.withColumn('flag_id',F.concat(F.lit("F000"), F.monotonically_increasing_id()))


# Reorder the columns
df_ordered_flag = flag_id_df.select(ordered_fraud_flag_columns)



# # Write the DataFrame to a Delta table
# df_ordered.write.format("delta").mode("overwrite").saveAsTable(silver_tables['fraud_flag'])

df_ordered_flag.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "dbfs:/FileStore/capstone/txn_tbl/checkpoint/silver/fraud_flag/") \
    .table(silver_tables['fraud_flag'])


Out[283]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2cfee60d00>

###Customer segment table Creation

In [0]:

from pyspark.sql.functions import col,when,floor,to_date,lit,round,datediff

 
joined_yr_df = joined_df.withColumn("years_diff", round(datediff(col("timestamp").cast("timestamp"), to_date(col("join_date"), "yyyy-MM-dd")) / lit(365.25)))

cust_seg_df = joined_yr_df.select(
    F.col('customer_id'),
    F.col('join_date'),
    F.when(filter_recent_customers('join_date'), "New_User")
     .when(filter_old_customers('last_update'), "Inactive")
     .when(F.col('amount') > 90000, "High_Value")
     .when(F.col('years_diff') > 5, "Loyal")
     .when(F.col('credit_score') < 510, "Credit_Risk")
     .otherwise("normal")  
     .alias('segment_name'),
    F.current_timestamp().alias('last_updated_date')
)

cust_seg_id_df = cust_seg_df.filter(F.col("segment_name") != "normal")


cust_seg_final_df = (
    cust_seg_id_df
    .withColumn(
        'segment_description',
        when(col("segment_name") == "High_Value", "Customers with high transaction volume")
        .when(col("segment_name") == "New_User", "Customers who joined in last 30 days")
        .when(col("segment_name") == "Inactive", "No transactions in last 90 days")
        .when(col("segment_name") == "Credit_Risk", "Customers with low credit scores")
        .when(col("segment_name") == "Loyal", "Consistent activity for over 5 years")
        .otherwise("Unknown")
    )
)

# Reorder the columns
df_ordered = cust_seg_final_df.select(ordered_customer_segments_columns)

# Write the DataFrame to a Delta table
# df_ordered.write.format("delta").mode("overwrite").saveAsTable(silver_tables['customer_segments'])

df_ordered.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "dbfs:/FileStore/capstone/txn_tbl/checkpoint/customer_segments/") \
    .table(silver_tables['customer_segments'])


Out[284]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2cfee5b5e0>

###Writing data in Silver Tables

In [0]:
cust_txn_df = joined_df.select(ordered_cut_txn_columns)

# cust_txn_df.write.format("delta").mode("overwrite").saveAsTable(silver_tables['cust_txn'])

cust_txn_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "dbfs:/FileStore/capstone/txn_tbl/checkpoint/silver/cust_txn/") \
    .table(silver_tables['cust_txn'])

Out[285]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2cfee60130>

In [0]:
# verified_txn_df.write.format("delta").mode("overwrite").saveAsTable(silver_tables['txn'])


# Write the streaming DataFrame to the Delta table
verified_txn_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "dbfs:/FileStore/capstone/txn_tbl/checkpoint/verified_txn_df/") \
    .table(silver_tables['txn'])

Out[286]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2cfee53e20>

In [0]:
verified_cust_df.write.format("delta").mode("overwrite").saveAsTable(silver_tables['cust'])

In [0]:
branch_df.write.format("delta").mode("overwrite").saveAsTable(silver_tables['branch'])