In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.appName("readFromKafka0").getOrCreate()

In [None]:
# 2. Define schema for Kafka JSON messages
schema = StructType([
    StructField("id", IntegerType()),
    StructField("amt", DoubleType()),
    StructField("dob", StringType()),
    StructField("job", StringType()),
    StructField("lat", DoubleType()),
    StructField("zip", IntegerType()),
    StructField("city", StringType()),
    StructField("last", StringType()),
    StructField("long", DoubleType()),          
    StructField("first", StringType()),
    StructField("state", StringType()),
    StructField("cc_num", StringType()),
    StructField("gender", StringType()),
    StructField("street", StringType()),
    StructField("category", StringType()),
    StructField("city_pop", IntegerType()),
    StructField("merchant", StringType()),
    StructField("merch_lat", DoubleType()),
    StructField("trans_num", StringType()),
    StructField("unix_time", LongType()),
    StructField("merch_long", DoubleType()),
    StructField("trans_date_trans_time", StringType()),
])


In [6]:
# 3. Read from Kafka topic
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "latest") \
    .load()

In [7]:
# 4. Parse Kafka JSON (value column is bytes, convert to string)
json_df = kafka_df.selectExpr("CAST(value AS STRING) AS json_str")

parsed_df = json_df.select(
    from_json(col("json_str"), schema).alias("data")
).select("data.*")

In [None]:
# 5. Load trained Spark ML Pipeline model
model = PipelineModel.load("./saved_model")


In [None]:
# 6. Run prediction
predictions_df = model.transform(parsed_df)

In [None]:
# 7. Select original features + prediction result
result_df = predictions_df.select(
    col("transaction_amount"),
    col("transaction_time"),
    col("merchant_id"),
    col("customer_id"),
    col("prediction").alias("fraud")
)

In [None]:
# 8. Convert to JSON for Kafka sink
output_df = result_df.select(
    to_json(struct("*")).alias("value")
)

In [None]:
# 9. Write predictions to another Kafka topic
query = output_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9093") \
    .option("topic", "ml_predictions") \
    .option("checkpointLocation", "/tmp/spark_checkpoints/ml_predictions") \
    .outputMode("append") \
    .start()

query.awaitTermination()