# Part 2: Streaming application using Spark Structured Streaming

### 1.	Write code to create a SparkSession, which 1) uses four cores with a proper application name; 2) use the Melbourne timezone; 3) ensure a checkpoint location has been set.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, BooleanType, TimestampType
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

from pyspark.sql import SparkSession

current_directory = os.getcwd()

spark = SparkSession.builder\
    .appName("FraudDetectionStreamingApp")\
    .master("local[4]")\
    .config("spark.sql.session.timeZone", "Australia/Melbourne")\
    .config("spark.sql.streaming.checkpointLocation", os.path.join(current_directory, "temp_checkpoint"))\
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

### 2.	Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. (You can use your code from 2A.)



In [2]:
from pyspark.sql.types import (StructType, StructField, StringType, IntegerType, 
                               DateType, FloatType, TimestampType, BooleanType)

# Define schema for customer.csv
customer_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", DateType(), True),  
    StructField("first_join_date", DateType(), True) 
])

# Define schema for category.csv
category_schema = StructType([
    StructField("category_id", StringType(), True),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

# Define schema for browsing_behaviour.csv
browsing_behaviour_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", TimestampType(), True),  
    StructField("traffic_source", StringType(), True),
    StructField("device_type", StringType(), True)
])

# Define schema for product.csv
product_schema = StructType([
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", StringType(), True)
])

# Define schema for transaction.csv
transaction_schema = StructType([
    StructField("created_at", TimestampType(), True),  
    StructField("customer_id", StringType(), True),
    StructField("transaction_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", FloatType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", FloatType(), True),
    StructField("shipment_location_lat", FloatType(), True),
    StructField("shipment_location_long", FloatType(), True),
    StructField("total_amount", FloatType(), True),
    StructField("clear_payment", StringType(), True)
])

# Define schema for customer_session.csv
customer_session_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("customer_id", StringType(), True)
])

# Define schema for fraud_transaction.csv
fraud_transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("is_fraud", BooleanType(), True)  
])

Reading Files 

In [3]:
# File paths
customer_csv_path = "customer.csv"
category_csv_path = "category.csv"
browsing_behaviour_csv_path = "browsing_behaviour.csv"
product_csv_path = "product.csv"
transaction_csv_path = "transactions.csv"
customer_session_csv_path = "customer_session.csv"
fraud_transaction_csv_path = "fraud_transaction.csv"

# Load CSV files into DataFrames with predefined schemas
df_customer = (spark.read.format("csv")
               .schema(customer_schema)
               .option("header", "true")
               .load(customer_csv_path))

df_category = (spark.read.format("csv")
               .schema(category_schema)
               .option("header", "true")
               .load(category_csv_path))

df_browsing_behaviour = (spark.read.format("csv")
                         .schema(browsing_behaviour_schema)
                         .option("header", "true")
                         .load(browsing_behaviour_csv_path))

df_product = (spark.read.format("csv")
              .schema(product_schema)
              .option("header", "true")
              .load(product_csv_path))

df_transaction = (spark.read.format("csv")
                  .schema(transaction_schema)
                  .option("header", "true")
                  .load(transaction_csv_path))

df_customer_session = (spark.read.format("csv")
                       .schema(customer_session_schema)
                       .option("header", "true")
                       .load(customer_session_csv_path))

df_fraud_transaction = (spark.read.format("csv")
                        .schema(fraud_transaction_schema)
                        .option("header", "true")
                        .load(fraud_transaction_csv_path))

# Print schemas for each DataFrame
print("Customer DataFrame Schema:")
df_customer.printSchema()

print("\nCategory DataFrame Schema:")
df_category.printSchema()

print("\nBrowsing Behaviour DataFrame Schema:")
df_browsing_behaviour.printSchema()

print("\nProduct DataFrame Schema:")
df_product.printSchema()

print("\nTransaction DataFrame Schema:")
df_transaction.printSchema()

print("\nCustomer Session DataFrame Schema:")
df_customer_session.printSchema()

print("\nFraud Transaction DataFrame Schema:")
df_fraud_transaction.printSchema()


Customer DataFrame Schema:
root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- first_join_date: date (nullable = true)


Category DataFrame Schema:
root
 |-- category_id: string (nullable = true)
 |-- cat_level1: string (nullable = true)
 |-- cat_level2: string (nullable = true)
 |-- cat_level3: string (nullable = true)


Browsing Behaviour DataFrame Schema:
root
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- device_type: string (nullable = true)


Product DataFrame Schema:
root
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- baseColour: string (nullable = true)
 |-- season: string (nullable =

In [4]:
from pyspark.sql.functions import col, json_tuple

# Extract product_id from product_metadata in df_transaction
df_transaction_parsed = df_transaction.withColumn("product_id", json_tuple(col("product_metadata"), "product_id"))

# Now, we join customer, product using 'id' from product, category, and fraud transaction static datasets
# Join df_fraud_transaction on transaction_id, df_product on 'id', and df_category on category_id
static_data_df = df_transaction_parsed.join(df_fraud_transaction, on='transaction_id', how='inner') \
                                      .join(df_customer, on='customer_id', how='inner') \
                                      .join(df_product, df_transaction_parsed.product_id == df_product.id, how='inner') \
                                      .join(df_category, on='category_id', how='inner')

# Now we have included the 'is_fraud' column from df_fraud_transaction to identify fraudulent transactions

static_data_df.printSchema()


root
 |-- category_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: float (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- shipment_fee: float (nullable = true)
 |-- shipment_location_lat: float (nullable = true)
 |-- shipment_location_long: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- clear_payment: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- is_fraud: boolean (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = t

### 3. Using the Kafka topics from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.


In [5]:
from pyspark.sql.functions import from_json, col, from_unixtime, expr, to_timestamp
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType, TimestampType

# Kafka configurations
kafka_browsing_topic = "browsing_behaviour"
kafka_transaction_topic = "transactions"
kafka_bootstrap_servers = "localhost:9092"

# Define schema for browsing behavior
browsing_schema = StructType() \
    .add("session_id", StringType()) \
    .add("event_type", StringType()) \
    .add("event_time", StringType()) \
    .add("traffic_source", StringType()) \
    .add("device_type", StringType()) \
    .add("ts", IntegerType())

# Define schema for transactions
transaction_schema = StructType() \
    .add("created_at", StringType()) \
    .add("customer_id", StringType()) \
    .add("transaction_id", StringType()) \
    .add("session_id", StringType()) \
    .add("product_metadata", StringType()) \
    .add("payment_method", StringType()) \
    .add("payment_status", StringType()) \
    .add("promo_amount", FloatType()) \
    .add("promo_code", StringType()) \
    .add("shipment_fee", FloatType()) \
    .add("shipment_location_lat", FloatType()) \
    .add("shipment_location_long", FloatType()) \
    .add("total_amount", FloatType()) \
    .add("clear_payment", StringType()) \
    .add("ts", IntegerType())

# Ingest browsing stream data from Kafka
browsing_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_browsing_topic) \
    .load() \
    .selectExpr("CAST(value AS STRING)")

# Ingest transaction stream data from Kafka
transaction_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_transaction_topic) \
    .load() \
    .selectExpr("CAST(value AS STRING)")

# Parse the browsing and transaction data using the schemas
browsing_stream_df = browsing_stream_df.withColumn("value", from_json(col("value"), browsing_schema)) \
                                       .selectExpr("value.*") \
                                       .withColumn("event_ts", to_timestamp(from_unixtime(col("ts")))) \
                                       .drop("ts")

transaction_stream_df = transaction_stream_df.withColumn("value", from_json(col("value"), transaction_schema)) \
                                             .selectExpr("value.*") \
                                             .withColumn("event_ts", to_timestamp(from_unixtime(col("ts")))) \
                                             .drop("ts")

# Print schemas to verify
browsing_stream_df.printSchema()
transaction_stream_df.printSchema()


root
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)

root
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: float (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- shipment_fee: float (nullable = true)
 |-- shipment_location_lat: float (nullable = true)
 |-- shipment_location_long: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- clear_payment: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)



### 4.	Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A. Perform the following tasks:  
a)	For the 'ts' column, convert it to the timestamp format, we will use it as event_ts.  
b)	If the data is late for more than 2 minutes, discard it.  


In [6]:
from pyspark.sql.functions import expr

# Current timestamp
current_timestamp_expr = expr("current_timestamp()")

# Filtering out late data (late by more than 2 minutes) with watermarking
browsing_stream_df = browsing_stream_df.withWatermark("event_ts", "2 minutes") \
    .filter(col("event_ts") >= current_timestamp_expr - expr("INTERVAL 2 MINUTES"))

transaction_stream_df = transaction_stream_df.withWatermark("event_ts", "2 minutes") \
    .filter(col("event_ts") >= current_timestamp_expr - expr("INTERVAL 2 MINUTES"))


### 5.	Aggregate the streaming data frames and create features you used in your assignment 2A model.  
(note: customer ID has already been included in the stream.) Then, join the static data frames with the streaming data frame as our final data for prediction. Perform data type/column conversion according to your ML model and print out the Schema. (Again, you can reuse code from A2A).

In [7]:
from pyspark.sql.functions import from_json, col, from_unixtime, expr, to_timestamp, window, count, sum as F_sum, json_tuple

# Join transaction stream with fraud static data
transaction_with_fraud_df = transaction_stream_df \
    .join(df_fraud_transaction, on="transaction_id", how="inner") \
    .select("transaction_id", "customer_id", "session_id", "product_metadata", "payment_method", 
            "payment_status", "promo_amount", "shipment_fee", "total_amount", "event_ts", "is_fraud")

# Verify schema after join
transaction_with_fraud_df.printSchema()

# Extract product details from 'product_metadata'
product_extracted_df = transaction_with_fraud_df.withColumn("product_id", json_tuple(col("product_metadata"), "product_id")) \
                                                .withColumn("quantity_str", json_tuple(col("product_metadata"), "quantity"))

# Cast 'quantity' to integer
product_extracted_df = product_extracted_df.withColumn("quantity", col("quantity_str").cast("int")).drop("quantity_str")

# Aggregate fraud count every 10 seconds
fraud_count_df = product_extracted_df \
    .groupBy(window(col("event_ts"), "10 seconds"), col("is_fraud")) \
    .agg(count("transaction_id").alias("fraud_count")) \
    .filter(col("is_fraud") == True) \
    .select("window", "fraud_count")

# Print schema for verification
fraud_count_df.printSchema()

# Write fraud count data to Parquet file
fraud_parquet_path = "/tmp/fraud_parquet"
fraud_checkpoint_path = "/tmp/fraud_checkpoint"

fraud_count_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", fraud_parquet_path) \
    .option("checkpointLocation", fraud_checkpoint_path) \
    .start()

# Calculating cumulative sales for non-fraud transactions 
cumulative_sales_df = product_extracted_df \
    .filter(col("is_fraud") == False) \
    .groupBy(window(col("event_ts"), "30 seconds"), col("product_id")) \
    .agg(F_sum(col("quantity")).alias("total_quantity"))

# Print schema for verification
cumulative_sales_df.printSchema()


root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: float (nullable = true)
 |-- shipment_fee: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- is_fraud: boolean (nullable = true)

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- fraud_count: long (nullable = false)

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- product_id: string (nullable = true)
 |-- total_quantity: long (nullable = true)



### 8.	Read the two parquet files from task 7 as data streams and send to Kafka topics with appropriate names.
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet file.)

In [8]:
# Write cumulative sales data to Parquet file
sales_parquet_path = "/tmp/sales_parquet"
sales_checkpoint_path = "/tmp/sales_checkpoint"

cumulative_sales_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", sales_parquet_path) \
    .option("checkpointLocation", sales_checkpoint_path) \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0xffff8755b520>

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define the schema for the cumulative sales Parquet file
sales_schema = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType(), True),
        StructField("end", TimestampType(), True)
    ])),
    StructField("product_id", StringType(), True),
    StructField("total_quantity", IntegerType(), True)
])

# Define the path to the saved sales Parquet file
sales_parquet_path = "/tmp/sales_parquet"

# Read the cumulative sales Parquet file as a stream using the defined schema
sales_stream_df = spark \
    .readStream \
    .schema(sales_schema) \
    .format("parquet") \
    .option("path", sales_parquet_path) \
    .load()

# Verify the schema of the sales streaming DataFrame
sales_stream_df.printSchema()


root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- product_id: string (nullable = true)
 |-- total_quantity: integer (nullable = true)



In [10]:
from pyspark.sql.functions import to_json, struct

# Convert the columns into a JSON format for Kafka
sales_stream_for_kafka = sales_stream_df \
    .selectExpr("to_json(struct(*)) AS value")

# Verify the structure before sending to Kafka
sales_stream_for_kafka.printSchema()


root
 |-- value: string (nullable = true)



### 7.	Write a Parquet file and save the following data frames (tip: you may look at part 3 and think about what columns to save):  
a.	Persist the raw data from 6a in parquet format. Every student may have different features/columns in their data frames depending on their model, at the bare minimum, we need some IDs to identify those frauds later on (transaction_id and/or session_id). After that, read the parquet file and show a few rows to verify it is saved correctly.  
b.	Persist the data from 6b in another parquet file.  

In [11]:
# Kafka topic and configurations
kafka_bootstrap_servers = "localhost:9092"
kafka_sales_topic = "cumulative_sales"

# Send the stream data to Kafka
sales_stream_for_kafka \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", kafka_sales_topic) \
    .option("checkpointLocation", "/tmp/sales_kafka_checkpoint") \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0xffff87558850>

### 6.	The company is interested in the number of potential frauds as they happen and the products in customers’ shopping carts (so that they can plan their stock level ahead.) Load your ML model, and use the model to predict/process each browsing session/transaction as follows:  
a)	Every 10 seconds, show the total number of potential frauds (prediction = 1) in the last 2 minutes, and persist the raw data (see 7a).  
b)	Every 30 seconds, find the top 20 products (order by quantity descending) in the last 30 seconds, show product ID, name and total quantity. We only need the non-fraud transactions (prediction=0) by extracting customer shopping cart details (sum of all items of ADD_TO_CART(ATC) events from browsing behaviour, you can also extract it from transactions).

In [12]:
# 6b


In [13]:
# 7a


In [14]:
# 7b


In [15]:
# Stream 1


In [16]:
# Stream 2
