In [0]:
transaction_bronze_df=spark.readStream.format('delta').load('abfss://data@storageaccountfda.dfs.core.windows.net/catalog/bronze/transaction_streams/data')

In [0]:
%sql
create table if not exists transaction_clean_final(transaction_id string,user_id string,card_id string,amount double,merchant string,merchant_country string,transaction_time timestamp,transaction_hour int,device_id string,channel string,ingest_time timestamp,processing_time timestamp);

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
def myfunction(transaction_clean_df,batch_id):
    transaction_window_spec=Window.partitionBy('transaction_id').orderBy(col('transaction_time').desc())
    transaction_clean_df=transaction_clean_df.withColumn('processing_time',current_timestamp())
    transaction_clean_df=transaction_clean_df.withColumn('rank',dense_rank().over(transaction_window_spec)).filter((col('rank')==1)&(col('record_status')=='valid'))
    transaction_clean_df.createOrReplaceTempView("transaction_clean_df")
    merge_statement = """merge into transaction_clean_final t using transaction_clean_df s
    on t.transaction_id == s.transaction_id
    when matched then
    update set t.user_id = s.user_id, t.card_id = s.card_id,
    t.amount= s.amount,t.merchant=s.merchant,t.merchant_country=s.merchant_country,t.transaction_time=s.transaction_time,t.device_id=s.device_id,t.channel=s.channel
    when not matched then
    insert *
    """

    transaction_clean_df._jdf.sparkSession().sql(merge_statement)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
transaction_clean_df=transaction_bronze_df.withColumn('is_mandatory_records_valid',when((col('transaction_id').isNotNull())&(col('user_id').isNotNull())&(col('card_id').isNotNull())&(col('amount').isNotNull())&(col('transaction_time').isNotNull()),True).otherwise('False'))\
     .withColumn('is_amount_valid',when(col('amount')>0,True).otherwise(False))\
         .withColumn('is_channel_valid',when(upper(col('channel')).isin('POS','ONLINE'),True).otherwise(False))
transaction_clean_df=transaction_clean_df.withColumn('record_status',when(~col('is_mandatory_records_valid')|(~col('is_amount_valid'))|(~col('is_channel_valid')),'invalid').otherwise('valid'))\
     .withColumn('error_status',when(~col('is_mandatory_records_valid'),'missing mandatory record')\
                 .otherwise(when(~col('is_amount_valid'),'amount must be greater than zero')\
                     .otherwise(when(~col('is_channel_valid'),'Invalid channel')\
                         .otherwise(None))))
transaction_clean_df=transaction_clean_df.withColumn('transaction_time',col('transaction_time').cast('timestamp'))\
.withColumn('transaction_hour',hour(col('transaction_time')))\
                    .withColumn('processing_time',current_timestamp())\
                    .withColumn('merchant_country',upper(col('merchant_country')))\
                    .withColumn('channel',upper(col('channel')))\
                    .withColumn('amount',col('amount').cast('double'))
transaction_clean_df.writeStream.format('delta').outputMode('update').option('mergeSchema',True).option('checkpointLocation','abfss://data@storageaccountfda.dfs.core.windows.net/catalog/silver/transaction_streams/checkpoint').trigger(availableNow=True)\
.foreachBatch(myfunction)\
.start()




<pyspark.sql.streaming.query.StreamingQuery at 0x7f0ce85cd010>

In [0]:
transaction_clean_df=spark.sql('select * from transaction_clean_final')
display(transaction_clean_df)

transaction_id,user_id,card_id,amount,merchant,merchant_country,transaction_time,transaction_hour,device_id,channel,ingest_time,processing_time
TXN1001,U101,C101,1200.0,Amazon,IN,2025-01-10T10:01:05Z,10,DV01,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1002,U101,C101,1500.0,Amazon,IN,2025-01-10T10:02:10Z,10,DV01,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1003,U102,C102,98000.0,Apple,US,2025-01-10T10:03:15Z,10,DV02,POS,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1004,U103,C103,250000.0,DubaiMall,AE,2025-01-10T10:04:20Z,10,DV03,POS,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1005,U104,C104,500.0,Flipkart,IN,2025-01-10T10:05:25Z,10,DV04,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1006,U101,C101,1800.0,Amazon,IN,2025-01-10T10:06:30Z,10,DV02,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1007,U101,C101,2200.0,Amazon,IN,2025-01-10T10:07:35Z,10,DV03,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1008,U105,C105,75000.0,AliExpress,CN,2025-01-10T10:08:40Z,10,DV05,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1009,U105,C105,76000.0,AliExpress,CN,2025-01-10T10:09:45Z,10,DV06,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z
TXN1010,U105,C105,78000.0,AliExpress,CN,2025-01-10T10:10:50Z,10,DV07,ONLINE,2025-12-25T13:56:03.78Z,2025-12-27T07:34:12.807316Z


In [0]:
customer_bronze_df=spark.readStream.format('delta').load('abfss://data@storageaccountfda.dfs.core.windows.net/catalog/bronze/customers')

In [0]:
%sql
create table if not exists customer_final_df(user_id string,customer_name string,customer_country string,customer_risk_score string);

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
def customer_function(customer_clean_df,batch_id):
    customer_window_spec=Window.partitionBy('user_id').orderBy(col('ingest_time').desc())
    customer_clean_df=customer_clean_df.withColumn('rank',dense_rank().over(customer_window_spec))
    customer_clean_df=customer_clean_df.filter((col('rank')==1)&(col('record_status')=='valid'))
    customer_clean_df.createOrReplaceTempView('customer_clean_df')
    merge_statement='''merge into customer_final_df t using customer_clean_df s
    on t.user_id=s.user_id
    when matched then update set t.customer_name=s.customer_name,t.customer_country=s.customer_country,t.customer_risk_score=s.customer_risk_score
    when not matched then insert *
    '''
    customer_clean_df._jdf.sparkSession().sql(merge_statement)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
customer_clean_df=customer_bronze_df.withColumn('is_mandatory_records_valid',when((col('user_id').isNotNull())&(col('customer_country').isNotNull()),True).otherwise(False))\
    .withColumn('is_valid_customer_risk_score',when(col('customer_risk_score').isin('LOW','MEDIUM','HIGH'),True).otherwise(False))
customer_clean_df=customer_clean_df.withColumn('record_status',when(~(col('is_mandatory_records_valid'))|(~col('is_valid_customer_risk_score')),'invalid').otherwise('valid'))\
    .withColumn('error_status',when(~(col('is_mandatory_records_valid')),'mandatory_columns_missing').otherwise(when(~col('is_valid_customer_risk_score'),'invalid_customer_risk_score').otherwise(None)))\
    .withColumn('customer_country',upper(col('customer_country')))\
    .withColumn('customer_risk_score',upper(col('customer_risk_score')))\
    .withColumn('customer_name',trim(col('customer_name')))\
    .withColumn('processing_time',current_timestamp())
customer_clean_df.writeStream.format('delta').outputMode('update').option('mergeSchema',True).trigger(availableNow=True).option('checkpointLocation','abfss://data@storageaccountfda.dfs.core.windows.net/catalog/silver/customers/checkpoint').foreachBatch(customer_function).start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7fc51fe6f410>

In [0]:
customer_clean_df=spark.sql('select * from customer_final_df')
display(customer_clean_df)

user_id,customer_name,customer_country,customer_risk_score
U101,Rahul Sharma,IN,LOW
U102,John Smith,US,LOW
U103,Ahmed Ali,AE,HIGH
U104,Priya Verma,IN,LOW
U105,Chen Wei,CN,HIGH


In [0]:
card_bronze_df=spark.readStream.format('delta').load('abfss://data@storageaccountfda.dfs.core.windows.net/catalog/bronze/cards')

In [0]:
%sql
create table if not exists card_final_table1(card_id string,user_id string,card_type string,card_status string,issue_date date);

In [0]:
def card_function(card_silver_df,batch_id):
    card_silver_df=card_silver_df.filter((col('record_status')=='valid')&(col('card_status')=='ACTIVE'))
    card_silver_df.createOrReplaceTempView('card_silver_view')
    merge_statements='''merge into card_final_table1 t using card_silver_view s on t.card_id=s.card_id
    when matched then update set t.user_id=s.user_id,t.card_type=s.card_type,t.card_status=s.card_status,t.issue_date=s.issue_date
    when not matched then insert *
    '''
    card_silver_df._jdf.sparkSession().sql(merge_statements)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *

card_silver_df = card_bronze_df \
    .withColumn('card_status', upper(col('card_status'))) \
    .withColumn('card_type', upper(col('card_type'))) \
    .withColumn('issue_date', to_date(col('issue_date'), 'yyyy-MM-dd')) \
    .withColumn(
        'is_mandatory_records_valid',
        when(
            col('card_id').isNotNull() &
            col('user_id').isNotNull() &
            col('card_status').isNotNull(),
            True
        ).otherwise(False)
    ) \
    .withColumn(
        'is_valid_status',
        when(col('card_status').isin('ACTIVE', 'BLOCKED'), True)
        .otherwise(False)
    ) \
    .withColumn(
        'record_status',
        when(
            (~col('is_mandatory_records_valid')) |
            (~col('is_valid_status')),
            'invalid'
        ).otherwise('valid')
    ) \
    .withColumn(
        'error_status',
        when(~col('is_mandatory_records_valid'), 'mandatory_records_are_missing')
        .when(~col('is_valid_status'), 'invalid_card_status')
    )

card_silver_df.writeStream.format('delta').outputMode('update').option('merge_schema',True).trigger(availableNow=True).option('checkpointLocation','abfss://data@storageaccountfda.dfs.core.windows.net/catalog/silver/cards/checkpoint').foreachBatch(card_function)\
.start()



<pyspark.sql.streaming.query.StreamingQuery at 0x7f0a88c38890>

In [0]:
cards_clean_df=spark.sql('select * from card_final_table1')
display(cards_clean_df)

card_id,user_id,card_type,card_status,issue_date
C101,U101,VISA,ACTIVE,2022-01-10
C102,U102,MASTERCARD,ACTIVE,2021-05-18
C103,U103,VISA,ACTIVE,2020-11-22
C104,U104,RUPAY,ACTIVE,2023-03-15
C105,U105,VISA,ACTIVE,2019-08-09


In [0]:
%sql
create table if not exists country_final_table(country_code string,country_name string,country_risk string)

In [0]:
def country_function(country_risk_clean_df,batch_id):
   country_risk_clean_df=country_risk_clean_df.filter(col('record_status')=='valid')
   country_risk_clean_df.createOrReplaceTempView('country_silver_view')
   merge_statement='''merge into country_final_table t using country_silver_view s
    on t.country_code=s.country_code
    when matched then update set t.country_name=s.country_name,t.country_risk=s.country_risk
    when not matched then insert *
    '''
   country_risk_clean_df._jdf.sparkSession().sql(merge_statement)

In [0]:
from pyspark.sql.functions import *
country_risk_bronze_df=spark.readStream.format('delta').load('abfss://data@storageaccountfda.dfs.core.windows.net/catalog/bronze/country_risk')
country_risk_clean_df=country_risk_bronze_df.withColumn('mandatory_records_valid',when((col('country_code').isNotNull())&(col('country_risk').isNotNull()),True).otherwise(False))\
    .withColumn('country_risk_status_valid',when(col('country_risk').isin('LOW','HIGH','MEDIUM'),True).otherwise(False))
country_risk_clean_df=country_risk_clean_df.withColumn('record_status',when(~col('mandatory_records_valid')|~col('country_risk_status_valid'),'invalid').otherwise('valid'))\
.withColumn('error_status',when((~col('mandatory_records_valid')),'Mandatory records are missing').otherwise(when(~col('country_risk_status_valid'),'invalid country risk status').otherwise(None)))\
    .withColumn('country_code',upper('country_code'))\
    .withColumn('country_risk',upper('country_risk'))\
    .withColumn('country_name',initcap('country_name'))
country_risk_clean_df.writeStream.format('delta').outputMode('update').trigger(availableNow=True).option('mergeSchema',True).option('checkpointLocation','abfss://data@storageaccountfda.dfs.core.windows.net/catalog/silver/country_risks/checkpoint')\
.foreachBatch(country_function).start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc9c8e68d70>

In [0]:
country_clean_df=spark.sql('select * from country_final_table')
display(country_clean_df)


country_code,country_name,country_risk
IN,India,LOW
US,United States,LOW
AE,United Arab Emirates,HIGH
CN,China,HIGH
RU,Russia,HIGH


In [0]:
%sql
drop table silver_enchriched_final_table;
create table silver_enchriched_final_table(transaction_id string,user_id string,card_id string,amount double,merchant string,merchant_country string,transaction_time timestamp,transaction_hour integer,device_id string,channel string,ingest_time timestamp,processing_time timestamp,customer_name string,customer_country string,customer_risk_score string,country_name string,country_risk string,card_type string,card_status string,issue_date date,is_international Boolean)

In [0]:
from pyspark.sql.functions import *
silver_enriched_df=transaction_clean_df.join(customer_clean_df,transaction_clean_df.user_id==customer_clean_df.user_id,'inner').drop(customer_clean_df.user_id).join(country_clean_df,customer_clean_df.customer_country==country_clean_df.country_code,'inner').drop(country_clean_df.country_code).join(cards_clean_df,transaction_clean_df.card_id==cards_clean_df.card_id,'inner').drop(cards_clean_df.card_id,cards_clean_df.user_id)
silver_enriched_df=silver_enriched_df.withColumn('is_international',when(col('merchant_country')==col('customer_country'),False).otherwise(True))
silver_enriched_df.createOrReplaceTempView('silver_enriched_view')

In [0]:
%sql
merge into silver_enchriched_final_table t using silver_enriched_view s
    on t.transaction_id=s.transaction_id
    when matched then update set t.user_id=s.user_id,t.card_id=s.card_id,t.amount=s.amount,t.merchant=s.merchant,t.transaction_time=s.transaction_time,t.transaction_hour=s.transaction_hour,t.device_id=s.device_id,t.channel=s.channel,t.customer_name=s.customer_name,
    t.customer_country=s.customer_country,t.customer_risk_score=s.customer_risk_score,t.merchant_country=s.merchant_country,t.country_name=s.country_name,t.country_risk=s.country_risk,t.card_type=s.card_type,t.card_status=s.card_status,t.issue_date=s.issue_date,t.is_international=s.is_international
    when not matched then insert *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
10,0,0,10


In [0]:
silver_enchriched_final_df=spark.sql('select * from silver_enchriched_final_table')
display(silver_enchriched_final_df)

transaction_id,user_id,card_id,amount,merchant,merchant_country,transaction_time,transaction_hour,device_id,channel,ingest_time,processing_time,customer_name,customer_country,customer_risk_score,country_name,country_risk,card_type,card_status,issue_date,is_international
TXN1001,U101,C101,1200.0,Amazon,IN,2025-01-10T10:01:05.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False
TXN1002,U101,C101,1500.0,Amazon,IN,2025-01-10T10:02:10.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False
TXN1003,U102,C102,98000.0,Apple,US,2025-01-10T10:03:15.000Z,10,DV02,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,John Smith,US,LOW,United States,LOW,MASTERCARD,ACTIVE,2021-05-18,False
TXN1004,U103,C103,250000.0,DubaiMall,AE,2025-01-10T10:04:20.000Z,10,DV03,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Ahmed Ali,AE,HIGH,United Arab Emirates,HIGH,VISA,ACTIVE,2020-11-22,False
TXN1005,U104,C104,500.0,Flipkart,IN,2025-01-10T10:05:25.000Z,10,DV04,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Priya Verma,IN,LOW,India,LOW,RUPAY,ACTIVE,2023-03-15,False
TXN1006,U101,C101,1800.0,Amazon,IN,2025-01-10T10:06:30.000Z,10,DV02,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False
TXN1007,U101,C101,2200.0,Amazon,IN,2025-01-10T10:07:35.000Z,10,DV03,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False
TXN1008,U105,C105,75000.0,AliExpress,CN,2025-01-10T10:08:40.000Z,10,DV05,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False
TXN1009,U105,C105,76000.0,AliExpress,CN,2025-01-10T10:09:45.000Z,10,DV06,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False
TXN1010,U105,C105,78000.0,AliExpress,CN,2025-01-10T10:10:50.000Z,10,DV07,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False


In [0]:
from pyspark.sql.functions import *
transactions_stream = silver_enchriched_final_df \
    .withWatermark("transaction_time", "15 minutes")
velocity_df = transactions_stream \
    .groupBy(
        col("card_id"),
        window(col("transaction_time"), "10 minutes", "1 minute")
    ) \
    .agg(
        count("*").alias("txn_count")
    )

velocity_fraud_df = velocity_df \
    .filter(col("txn_count") > 3) \
    .select(
        col("card_id"),
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("txn_count"),
        lit("Velocity check: >5 txns in 10 minutes").alias("fraud_reason")
    )
devices_df=transactions_stream.groupBy('card_id',window('transaction_time','30 minutes','10 minutes')).agg(countDistinct('device_id').alias('device_count'))
devices_fraud_df=devices_df.filter(col('device_count')>2).select(col('card_id'),col('window.start').alias('window_start'),col('window.end').alias('window_end'),col('device_count'),lit('device_count>2 per card in 30 minutes').alias('fraud_reason'))
final_device_df=transactions_stream.join(devices_fraud_df.select('card_id').distinct(),on='card_id',how='leftsemi')
devices_list=final_device_df.select('transaction_id').collect()
devices=[row['transaction_id'] for row in devices_list]
final_velocity_df = transactions_stream.join(
    velocity_fraud_df.select('card_id').distinct(),
    on='card_id',
    how='left_semi'
)
transaction_list=final_velocity_df.select('transaction_id').collect()
transactions1=[row['transaction_id'] for row in transaction_list]
transactions_stream=transactions_stream.withColumn('is_velocity_Count_valid',when(col('transaction_id').isin(transactions1),False).otherwise(True))
transaction_stream=transactions_stream.withColumn('is_device_card_valid',when(col('transaction_id').isin(devices),False).otherwise(True))
display(transaction_stream)

transaction_id,user_id,card_id,amount,merchant,merchant_country,transaction_time,transaction_hour,device_id,channel,ingest_time,processing_time,customer_name,customer_country,customer_risk_score,country_name,country_risk,card_type,card_status,issue_date,is_international,is_velocity_Count_valid,is_device_card_valid
TXN1001,U101,C101,1200.0,Amazon,IN,2025-01-10T10:01:05.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False
TXN1002,U101,C101,1500.0,Amazon,IN,2025-01-10T10:02:10.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False
TXN1003,U102,C102,98000.0,Apple,US,2025-01-10T10:03:15.000Z,10,DV02,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,John Smith,US,LOW,United States,LOW,MASTERCARD,ACTIVE,2021-05-18,False,True,True
TXN1004,U103,C103,250000.0,DubaiMall,AE,2025-01-10T10:04:20.000Z,10,DV03,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Ahmed Ali,AE,HIGH,United Arab Emirates,HIGH,VISA,ACTIVE,2020-11-22,False,True,True
TXN1005,U104,C104,500.0,Flipkart,IN,2025-01-10T10:05:25.000Z,10,DV04,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Priya Verma,IN,LOW,India,LOW,RUPAY,ACTIVE,2023-03-15,False,True,True
TXN1006,U101,C101,1800.0,Amazon,IN,2025-01-10T10:06:30.000Z,10,DV02,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False
TXN1007,U101,C101,2200.0,Amazon,IN,2025-01-10T10:07:35.000Z,10,DV03,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False
TXN1008,U105,C105,75000.0,AliExpress,CN,2025-01-10T10:08:40.000Z,10,DV05,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False
TXN1009,U105,C105,76000.0,AliExpress,CN,2025-01-10T10:09:45.000Z,10,DV06,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False
TXN1010,U105,C105,78000.0,AliExpress,CN,2025-01-10T10:10:50.000Z,10,DV07,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False


In [0]:
fraud_detection_df=transaction_stream.withColumn('is_amount_valid',when(col('amount')>100000,False).otherwise(True))\
              .withColumn('is_international_valid',when((col('is_international')==True)&(col('country_risk')=='HIGH'),False).otherwise(True))
fraudation_detection_df=fraud_detection_df.withColumn('fraud_flag',when(~col('is_amount_valid')|~col('is_international_valid')|~col('is_velocity_Count_valid')|~col('is_device_card_valid'),True).otherwise(False))\
    .withColumn(
        "amount_reason",
        when(~col("is_amount_valid"), "High Amount (>100000)")
    ) \
    .withColumn(
        "international_reason",
        when(~col("is_international_valid"), "International + High Risk Country")
    ) \
    .withColumn(
        "velocity_reason",
        when(~col("is_velocity_Count_valid"), "Velocity Spike (>5 txns in 10 mins)")
    ) \
    .withColumn(
        "device_reason",
        when(~col("is_device_card_valid"), "Multiple Devices per Card")
    )
fraud_df = fraudation_detection_df.withColumn(
        "fraud_reasons_array",
        array_compact(
        array(
            col("amount_reason"),
            col("international_reason"),
            col("velocity_reason"),
            col("device_reason")
        )))\
    .withColumn(
        "fraud_reason",
        concat_ws(" | ", col("fraud_reasons_array"))
    )
    

display(fraud_df)
fraud_df.createOrReplaceTempView('fraud_view')
        

transaction_id,user_id,card_id,amount,merchant,merchant_country,transaction_time,transaction_hour,device_id,channel,ingest_time,processing_time,customer_name,customer_country,customer_risk_score,country_name,country_risk,card_type,card_status,issue_date,is_international,is_velocity_Count_valid,is_device_card_valid,is_amount_valid,is_international_valid,fraud_flag,amount_reason,international_reason,velocity_reason,device_reason,fraud_reasons_array,fraud_reason
TXN1001,U101,C101,1200.0,Amazon,IN,2025-01-10T10:01:05.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1002,U101,C101,1500.0,Amazon,IN,2025-01-10T10:02:10.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1003,U102,C102,98000.0,Apple,US,2025-01-10T10:03:15.000Z,10,DV02,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,John Smith,US,LOW,United States,LOW,MASTERCARD,ACTIVE,2021-05-18,False,True,True,True,True,False,,,,,List(),
TXN1004,U103,C103,250000.0,DubaiMall,AE,2025-01-10T10:04:20.000Z,10,DV03,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Ahmed Ali,AE,HIGH,United Arab Emirates,HIGH,VISA,ACTIVE,2020-11-22,False,True,True,False,True,True,High Amount (>100000),,,,List(High Amount (>100000)),High Amount (>100000)
TXN1005,U104,C104,500.0,Flipkart,IN,2025-01-10T10:05:25.000Z,10,DV04,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Priya Verma,IN,LOW,India,LOW,RUPAY,ACTIVE,2023-03-15,False,True,True,True,True,False,,,,,List(),
TXN1006,U101,C101,1800.0,Amazon,IN,2025-01-10T10:06:30.000Z,10,DV02,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1007,U101,C101,2200.0,Amazon,IN,2025-01-10T10:07:35.000Z,10,DV03,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1008,U105,C105,75000.0,AliExpress,CN,2025-01-10T10:08:40.000Z,10,DV05,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False,True,True,True,,,,Multiple Devices per Card,List(Multiple Devices per Card),Multiple Devices per Card
TXN1009,U105,C105,76000.0,AliExpress,CN,2025-01-10T10:09:45.000Z,10,DV06,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False,True,True,True,,,,Multiple Devices per Card,List(Multiple Devices per Card),Multiple Devices per Card
TXN1010,U105,C105,78000.0,AliExpress,CN,2025-01-10T10:10:50.000Z,10,DV07,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False,True,True,True,,,,Multiple Devices per Card,List(Multiple Devices per Card),Multiple Devices per Card


In [0]:
%sql
create table if not exists fraud_datection_table as 
select * from fraud_view;

num_affected_rows,num_inserted_rows


In [0]:
%sql
merge into fraud_datection_table t using fraud_view s
     on t.transaction_id=s.transaction_id
     when matched then update set *
     when not matched then insert *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
10,10,0,0


In [0]:
%sql
select * from fraud_datection_table;

transaction_id,user_id,card_id,amount,merchant,merchant_country,transaction_time,transaction_hour,device_id,channel,ingest_time,processing_time,customer_name,customer_country,customer_risk_score,country_name,country_risk,card_type,card_status,issue_date,is_international,is_velocity_Count_valid,is_device_card_valid,is_amount_valid,is_international_valid,fraud_flag,amount_reason,international_reason,velocity_reason,device_reason,fraud_reasons_array,fraud_reason
TXN1001,U101,C101,1200.0,Amazon,IN,2025-01-10T10:01:05.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1002,U101,C101,1500.0,Amazon,IN,2025-01-10T10:02:10.000Z,10,DV01,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1003,U102,C102,98000.0,Apple,US,2025-01-10T10:03:15.000Z,10,DV02,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,John Smith,US,LOW,United States,LOW,MASTERCARD,ACTIVE,2021-05-18,False,True,True,True,True,False,,,,,List(),
TXN1004,U103,C103,250000.0,DubaiMall,AE,2025-01-10T10:04:20.000Z,10,DV03,POS,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Ahmed Ali,AE,HIGH,United Arab Emirates,HIGH,VISA,ACTIVE,2020-11-22,False,True,True,False,True,True,High Amount (>100000),,,,List(High Amount (>100000)),High Amount (>100000)
TXN1005,U104,C104,500.0,Flipkart,IN,2025-01-10T10:05:25.000Z,10,DV04,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Priya Verma,IN,LOW,India,LOW,RUPAY,ACTIVE,2023-03-15,False,True,True,True,True,False,,,,,List(),
TXN1006,U101,C101,1800.0,Amazon,IN,2025-01-10T10:06:30.000Z,10,DV02,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1007,U101,C101,2200.0,Amazon,IN,2025-01-10T10:07:35.000Z,10,DV03,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Rahul Sharma,IN,LOW,India,LOW,VISA,ACTIVE,2022-01-10,False,False,False,True,True,True,,,Velocity Spike (>5 txns in 10 mins),Multiple Devices per Card,"List(Velocity Spike (>5 txns in 10 mins), Multiple Devices per Card)",Velocity Spike (>5 txns in 10 mins) | Multiple Devices per Card
TXN1008,U105,C105,75000.0,AliExpress,CN,2025-01-10T10:08:40.000Z,10,DV05,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False,True,True,True,,,,Multiple Devices per Card,List(Multiple Devices per Card),Multiple Devices per Card
TXN1009,U105,C105,76000.0,AliExpress,CN,2025-01-10T10:09:45.000Z,10,DV06,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False,True,True,True,,,,Multiple Devices per Card,List(Multiple Devices per Card),Multiple Devices per Card
TXN1010,U105,C105,78000.0,AliExpress,CN,2025-01-10T10:10:50.000Z,10,DV07,ONLINE,2025-12-25T13:56:03.780Z,2025-12-27T07:34:12.807Z,Chen Wei,CN,HIGH,China,HIGH,VISA,ACTIVE,2019-08-09,False,True,False,True,True,True,,,,Multiple Devices per Card,List(Multiple Devices per Card),Multiple Devices per Card
