### **Optional Step if hadoop is not installed**

In [22]:
# Optional if hadoop is not install in machine
import os
import sys

# 1. Set the path to the folder containing 'bin'
os.environ['HADOOP_HOME'] = "C:\\hadoop"

# 2. Add the bin directory to the system path so Java can find the DLL
sys.path.append("C:\\hadoop\\bin")
os.environ['PATH'] += os.pathsep + "C:\\hadoop\\bin"

## **Libraries**

In [23]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
import sys
import redis
from loguru import logger
import json

### **Spark Session**

In [24]:
# Delta Lake Jar files
# construct Absolute Paths to your JARs
# Get the current directory of the script
current_dir = os.getcwd()
jar_folder = os.path.join(current_dir, "jars")

# Paths to the specific files
delta_core = os.path.join(jar_folder, "delta-core_2.12-2.4.0.jar")
delta_storage = os.path.join(jar_folder, "delta-storage-2.4.0.jar")

# Verify they exist
if not os.path.exists(delta_core):
    print(f"❌ Error: Could not find {delta_core}")
    sys.exit(1)

PACKAGES = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1"

In [25]:

spark=(
    SparkSession
    .builder
    .appName('KafkaBatchDebug')
    .config("spark.jars.packages", PACKAGES)
    .config("spark.jars.repositories", "https://repos.spark-packages.org")
    .config('spark.streaming.stopGracefullyOnShutdown',True)
 #   .config('spark.sql.shuffle.partitions',4)
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") 
    .config("spark.jars", f"{delta_core},{delta_storage}")
    # Delta Configuration
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.host", "127.0.0.1")
    .master('local[*]')
    .getOrCreate()
)
spark

### **Read Kafka Topic Data**

In [26]:
kafka_df=spark.read.format('kafka')\
    .option('kafka.bootstrap.servers','localhost:9092')\
    .option('subscribe','raw_transactions')\
    .option('startingOffsets','earliest')\
    .load()

In [27]:
kafka_df.printSchema()
kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+----------------+---------+------+--------------------+-------------+
| key|               value|           topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------------+---------+------+--------------------+-------------+
|null|[7B 22 74 72 61 6...|raw_transactions|        0|     0|2025-12-29 16:45:...|            0|
|null|[7B 22 74 72 61 6...|raw_transactions|        0|     1|2025-12-29 16:45:...|            0|
|null|[7B 22 74 72 61 6...|raw_transactions|        0|     2|2025-12-29 16:45:...|            0|
|null|[7B 22 74 72 61 6...|raw_transactions|        0|     3|2025-12-29 16:45:...|            0|
|null|[7B 22 74 72 61 6...|raw_transac

### **Convert value column to proper data**

In [28]:
kafka_df=kafka_df.withColumn('value',col('value').cast('string'))

In [29]:
kafka_df.select('value').show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                     |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"transaction_id": "9c982496-288b-4739-b960-29a366e52a43", "user_id": 815, "user_name": "Garrett Kim", "amount": 362.92, "currency": "USD", "merchant": "Patterson-Thomas", "location": "Angelatown", "timestamp": 1767006950.7128117}          

### **Schema for Json data**

In [30]:
json_schema=StructType([
    StructField('transaction_id',StringType()),
    StructField('user_id',LongType()),
    StructField('user_name',StringType()),
    StructField('amount',DoubleType()),
    StructField('currency',StringType()),
    StructField('merchant',StringType()),
    StructField('location',StringType()),
    StructField('timestamp',DoubleType()),
])

parsed_df=kafka_df.withColumn('value_json',from_json(col('value'),json_schema)).select('value_json.*')
parsed_df=parsed_df.withColumn('timestamp',to_timestamp((col('timestamp')).cast('long')))
parsed_df.printSchema()
parsed_df.show(truncate=False)

root
 |-- transaction_id: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_name: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- location: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+------------------------------------+-------+----------------+------+--------+---------------------------+------------------+-------------------+
|transaction_id                      |user_id|user_name       |amount|currency|merchant                   |location          |timestamp          |
+------------------------------------+-------+----------------+------+--------+---------------------------+------------------+-------------------+
|9c982496-288b-4739-b960-29a366e52a43|815    |Garrett Kim     |362.92|USD     |Patterson-Thomas           |Angelatown        |2025-12-29 16:45:50|
|d92fa70e-ba9d-406b-a512-c9006c4a97b4|45     |Sarah Diaz      |63.75 |USD     |M

### **Fraud Transactions**

#### Amount>10000 fraud

In [31]:
parsed_df.filter(col('amount')>10000).show()

+--------------------+-------+--------------+--------+--------+--------------------+-------------------+-------------------+
|      transaction_id|user_id|     user_name|  amount|currency|            merchant|           location|          timestamp|
+--------------------+-------+--------------+--------+--------+--------------------+-------------------+-------------------+
|d781e910-a54e-4a8...|    303| Alex Cardenas|10202.35|     USD|         Davis Group|       South Audrey|2025-12-29 16:48:41|
|c5caec88-56fd-435...|    256|  Gabriel West|19834.11|     USD|Lester, Kirby and...|Lake Jacquelineside|2025-12-29 16:48:48|
|275b1ad3-ef75-4b9...|    974|   Amber Stein|17270.29|     USD|        Greene Group|  South Diamondfurt|2025-12-29 20:18:33|
|6ee929a6-8169-400...|    314| Kevin Wallace|11705.97|     USD|    Williams-Jimenez|       Estradamouth|2025-12-29 20:18:40|
|11459027-8064-41c...|    430|Elizabeth Wade|17598.32|     USD|   Crawford-Martinez|     Port Stephanie|2025-12-29 20:18:43|


#### Black listed Merchants

In [32]:
# In production, this would load from a CSV or Database
bad_merchants_data = [
    ("Evil Corp", "High Risk"),
    ("Fake Store Ltd", "Banned"),
    ("Scam Hub", "Under Investigation")
]
schema=['merchant','risk_level']

bad_merchants_df=spark.createDataFrame(data=bad_merchants_data,schema=schema)
bad_merchants_df.show()

+--------------+-------------------+
|      merchant|         risk_level|
+--------------+-------------------+
|     Evil Corp|          High Risk|
|Fake Store Ltd|             Banned|
|      Scam Hub|Under Investigation|
+--------------+-------------------+



In [34]:
enriched_df=parsed_df.join(broadcast(bad_merchants_df),['merchant'],'left')
enriched_df.show(truncate=False)

+---------------------------+------------------------------------+-------+----------------+------+--------+------------------+-------------------+----------+
|merchant                   |transaction_id                      |user_id|user_name       |amount|currency|location          |timestamp          |risk_level|
+---------------------------+------------------------------------+-------+----------------+------+--------+------------------+-------------------+----------+
|Patterson-Thomas           |9c982496-288b-4739-b960-29a366e52a43|815    |Garrett Kim     |362.92|USD     |Angelatown        |2025-12-29 16:45:50|null      |
|Martin-Richards            |d92fa70e-ba9d-406b-a512-c9006c4a97b4|45     |Sarah Diaz      |63.75 |USD     |Schultzside       |2025-12-29 16:45:51|null      |
|Rodriguez-Herrera          |2a2412cb-843d-4ff1-b841-a655566f7baa|49     |Lisa Wood       |471.85|USD     |North Amyfort     |2025-12-29 16:45:52|null      |
|Johnson-Robles             |cef76480-ba6a-461b-b01f

In [35]:
final_df=enriched_df.withColumn('is_fraud',when(col('amount')>10000,'Yes').when(col('risk_level').isNotNull(),'Yes').otherwise('No'))\
.withColumn('Reason',when(col('amount')>10000,'High Amount').when(col('risk_level').isNotNull(),'Banned Merchant').otherwise('Normal'))

In [36]:
fraud_df=final_df.filter(col('is_fraud')=='Yes')
fraud_df.show(truncate=False)

+------------------------+------------------------------------+-------+------------------+--------+--------+-------------------+-------------------+-------------------+--------+---------------+
|merchant                |transaction_id                      |user_id|user_name         |amount  |currency|location           |timestamp          |risk_level         |is_fraud|Reason         |
+------------------------+------------------------------------+-------+------------------+--------+--------+-------------------+-------------------+-------------------+--------+---------------+
|Davis Group             |d781e910-a54e-4a81-a52f-4581543ab00c|303    |Alex Cardenas     |10202.35|USD     |South Audrey       |2025-12-29 16:48:41|null               |Yes     |High Amount    |
|Lester, Kirby and Ali   |c5caec88-56fd-435b-849a-5590b0f1098e|256    |Gabriel West      |19834.11|USD     |Lake Jacquelineside|2025-12-29 16:48:48|null               |Yes     |High Amount    |
|Evil Corp               |a345

### **Sink Save everything in deltalake**

In [37]:
sink_location=os.path.join(os.getcwd(),'lakehouse')

final_df.write.format('delta')\
.mode('append')\
.save(sink_location)

### **Sink save fraud transaction in Redis**

In [38]:
if not fraud_df.isEmpty():
        #count = fraud_df.count()
        #print(f"⚡ Batch {batch_id}: Found {count} fraud alerts! Pushing to Redis...")
        
        try:
            fraud_rows = fraud_df.collect()
            r = redis.Redis(host='localhost', port=6379, db=0)
            
            for row in fraud_rows:
                key = f"fraud:{row.user_id}"
                
                # --- FIX IS HERE ---
                # We generate the current time right now using Python
                # This prevents the "ingest_time not found" error

                fraud_date={
                                'user_id':row.user_id,
                                'user_name':row.user_name,
                                'amount':row.amount,
                                'merchant':row.merchant,
                                'reason':row.Reason
                            }
                
                r.setex(key, 3600, json.dumps(fraud_date))
                
            r.close()
            print(f"   ✅ Successfully pushed {count} alerts.")
            
        except Exception as e:
            print(f"   ❌ Redis Write Failed: {e}")

   ✅ Successfully pushed <function count at 0x00000215052F44A0> alerts.


In [1]:
spark.stop()

NameError: name 'spark' is not defined