In [19]:
import joblib
import pandas as pd 
from math import radians, cos, sin, asin, sqrt
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import (
    StructType, StructField, IntegerType, DoubleType, StringType, LongType, BooleanType
)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FraudDetectionStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .getOrCreate()

rf_model = joblib.load("best_rf_model.pkl")      # Trained RandomForestClassifier
scaler = joblib.load("scaler.pkl")          # The StandardScaler fitted on training data
selected_features = ['merchant', 'category', 'amt', 'city_pop', 'job', 'unix_time', 'age', 'distance']

In [5]:
# Define a batch prediction function for streaming micro-batches
# This function will mimic pre-processing steps and then predict fraud

def predict_fraud_batch(pdf: pd.DataFrame) -> pd.DataFrame:
    # Make a copy 
    df = pdf.copy()
    
    # Preprocessing steps 
    # Convert 'trans_date_trans_time' to datetime and compute 'age'
    df['trans_date_trans_time'] = pd.to_datetime(df['trans_date_trans_time'])
    df['transaction_year'] = df['trans_date_trans_time'].dt.year
    df['year_of_birth'] = pd.to_datetime(df['dob']).dt.year
    df['age'] = df['transaction_year'] - df['year_of_birth']
    df.drop(columns=['dob', 'transaction_year', 'year_of_birth'], inplace=True)
    
    # Drop irrelevant columns
    df.drop(columns=['Unnamed: 0', 'cc_num', 'trans_num', 'street'], inplace=True, errors='ignore')
    
    # Haversine function to compute distance (in kilometers)
    def haversine(lat1, lon1, lat2, lon2):
        lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
        dlon = lon2 - lon1
        dlat = lat2 - lat1
        a = sin(dlat/2)**2 + cos(lat1)*cos(lat2)*sin(dlon/2)**2
        c = 2 * asin(sqrt(a))
        r = 6371  # Radius of Earth in km.
        return c * r

    df['distance'] = df.apply(lambda row: haversine(row['lat'], row['long'], row['merch_lat'], row['merch_long']), axis=1)
    
    # Create buckets (bins) for geographic features
    n_bins = 10
    df['lat_bucket'] = pd.cut(df['lat'], bins=n_bins, labels=False)
    df['long_bucket'] = pd.cut(df['long'], bins=n_bins, labels=False)
    df['merch_lat_bucket'] = pd.cut(df['merch_lat'], bins=n_bins, labels=False)
    df['merch_long_bucket'] = pd.cut(df['merch_long'], bins=n_bins, labels=False)
    
    # Drop columns that are no longer needed after feature engineering.
    df.drop(columns=['trans_date_trans_time', 'first', 'last', 'city', 'state', 'zip', 'lat', 'long', 'merch_lat', 'merch_long'], inplace=True, errors='ignore')
    
    # Remove target column if present (since in streaming, you’re predicting it)
    if 'is_fraud' in df.columns:
        df.drop(columns=['is_fraud'], inplace=True)
    
    # Normalize numerical features using the pre-fitted scaler.
    numerical_columns = ['amt', 'age', 'distance', 'lat_bucket', 'long_bucket', 'merch_lat_bucket', 'merch_long_bucket']
    df[numerical_columns] = scaler.transform(df[numerical_columns])
    
    # Select the features expected by the model.
    X = df[selected_features]
    
    # Use the pre-trained Random Forest model to predict fraud.
    predictions = rf_model.predict(X)
    df['predicted_fraud'] = predictions.astype(bool)
    
    # Return only the prediction column (or you can return more columns as needed).
    return df[['predicted_fraud']]

In [20]:
# Create Spark session

spark = SparkSession.builder.appName("FraudDetectionStreaming").getOrCreate()

# Define schema for the incoming JSON messages.
# Adjust field names/types based on csv data / produced JSON.

input_schema = StructType([
    StructField("Unnamed: 0", IntegerType(), True),
    StructField("trans_date_trans_time", StringType(), True),
    StructField("cc_num", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amt", DoubleType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("street", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("trans_num", StringType(), True),
    StructField("unix_time", LongType(), True),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True),
    StructField("is_fraud", IntegerType(), True)  # May not be present in live data
])

# Read streaming data from Kafka
raw_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .load()

# Convert Kafka values in binary to string and parse to JSON
json_df = raw_df.selectExpr("CAST(value AS STRING) as json_string")
data_df = json_df.select(from_json(col("json_string"), input_schema).alias("data")).select("data.*")


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [None]:
# -----------------------------
# Apply the prediction function to each micro-batch using applyInPandas.
# This will run our pre-processing and model prediction on each batch.
# -----------------------------
# We define the output schema for the predictions.
from pyspark.sql.types import StructType, StructField
output_schema = StructType([StructField("predicted_fraud", BooleanType(), True)])

result_df = data_df.groupBy().applyInPandas(predict_fraud_batch, schema=output_schema)

# -----------------------------
# Write the predictions to the console.
# -----------------------------
query = result_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

In [17]:
# Test if the spark session works 

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.range(10)
df.show()
spark.stop()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+

