In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# No import statement needed as per the rules.

spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

In [0]:
schema = StructType([
    StructField("card_id", LongType(), False),
    StructField("member_id", LongType(), False),
    StructField("amount", FloatType(), False),
    StructField("post_code", LongType(), False),
    StructField("pos_id", LongType(), False),
    StructField("transaction_date", StringType(), False),
    StructField("status", StringType(), False)
    ])

In [0]:
df_kafka = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "13.211.91.229:9092")
    .option("subscribe", "my-topic")
    .option("startingOffsets", "latest")
    .load()
    )

df_transactions = df_kafka.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

df_transactions.printSchema()  
display(df_kafka)  

In [0]:
df_fraud_lookup = spark.sql("SELECT * FROM spark_catalog.credit_card_db.lookup_table")


In [0]:
df_transactions = df_transactions.withColumn("transaction_date", to_timestamp(col("transaction_date"), "yyyy-MM-dd HH:mm:ss"))

# Rename lookup columns to avoid conflicts
df_fraud_lookup = df_fraud_lookup.withColumnRenamed("card_id", "lookup_card_id") \
                                 .withColumnRenamed("member_id", "lookup_member_id")

# Perform the join correctly with proper column aliasing
df_result = df_transactions.alias("txn") \
    .join(df_fraud_lookup.alias("lookup"), col("txn.card_id") == col("lookup.lookup_card_id"), "left") \
    .select(
        col("txn.card_id").alias("card_id"),
        col("txn.member_id").alias("member_id"),
        col("txn.amount").alias("amount"),
        col("txn.post_code").alias("post_code"),
        col("txn.pos_id").alias("pos_id"),
        col("txn.transaction_date").alias("transaction_date"),
        col("txn.status").alias("status"),
        col("lookup.UCL").alias("UCL"),
        col("lookup.score").alias("score"),
        col("lookup.last_txn_time").alias("last_txn_time"),
        col("lookup.lookup_member_id").alias("lookup_member_id"),
        when(col("txn.amount").isNotNull() & col("lookup.UCL").isNotNull() & (col("txn.amount") > col("lookup.UCL")), "Exceeds UCL")
        .when(col("lookup.score").isNotNull() & (col("lookup.score") < 0.5), "Low Score")
        .when(col("txn.transaction_date").isNotNull() & col("lookup.last_txn_time").isNotNull() & (col("txn.transaction_date") < col("lookup.last_txn_time")), "New Member or Recent Transaction")
        .otherwise("Legit").alias("fraud_flag")
    )

# df_result = df_result.filter(col("fraud_flag") == "Legit")

In [0]:
df_fraud_lookup = df_fraud_lookup.withColumnRenamed("card_id", "lookup_card_id")
df_fraud_lookup = df_fraud_lookup.withColumnRenamed("member_id", "lookup_member_id")
df_result = df_transactions.alias("txn").join(df_fraud_lookup.alias("lookup"), df_transactions.card_id == df_fraud_lookup.lookup_card_id, "left") \
.withColumn("fraud_flag", when(col("txn.amount") > col("lookup.UCL"), "Exceeds UCL")
            .when(col("lookup.score") < 0.5, "Low Score")
            .when(col("txn.transaction_date") < col("lookup.last_txn_time"), "New Member or Recent Transaction")
            .otherwise("Legit")
)

# df_result = df_result.filter(col("fraud_flag") == "Legit")

In [0]:


df_result.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/fraud_results3") \
    .start("dbfs:/mnt/fraud_results3") 

In [0]:
display(dbutils.fs.ls("dbfs:/mnt/checkpoints/fraud_results3"))


In [0]:
display(dbutils.fs.ls("dbfs:/mnt/checkpoints/fraud_results2/commits"))


In [0]:
df = spark.read.format("csv").load("dbfs:/mnt/checkpoints/fraud_results2/commits/4")
display(df)

In [0]:
df = spark.read.format("delta").load("dbfs:/mnt/fraud_results3")
display(df)

In [0]:
df_result.printSchema()

