# Part 2: Streaming application using Spark Structured Streaming

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, BooleanType
from pyspark.sql.functions import col, unix_timestamp, from_json
from pyspark.ml import PipelineModel

### 1.	Write code to create a SparkSession, which 1) uses four cores with a proper application name; 2) use the Melbourne timezone; 3) ensure a checkpoint location has been set.


In [3]:
checkpoint_dir = "spark_streaming_checkpoint"
spark = SparkSession.builder \
    .appName("FraudDetectionStreaming") \
    .master("local[4]") \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .config("spark.sql.streaming.checkpointLocation", checkpoint_dir) \
    .getOrCreate()

print("Spark Session Created")

Spark Session Created


### 2.	Write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames.



In [4]:
category_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", StringType(), True),
    StructField("first_join_date", StringType(), True)
])

product_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", IntegerType(), True)
])

customer_session_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("customer_id", IntegerType(), True)
])

# Fraud Transaction Schema
fraud_transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("is_fraud", BooleanType(), True)
])

# Load static datasets into DataFrames
category_df = spark.read.csv("category.csv", header=True, schema=category_schema)
category_df.show(5)
category_df.printSchema()

customer_df = spark.read.csv("customer.csv", header=True, schema=customer_schema)
customer_df.show(5)
customer_df.printSchema()

product_df = spark.read.csv("product.csv", header=True, schema=product_schema)
product_df.show(5)
product_df.printSchema()

customer_session_df = spark.read.csv("customer_session.csv", header=True, schema=customer_session_schema)
customer_session_df.show(5)
customer_session_df.printSchema()

fraud_transaction_df = spark.read.csv("fraud_transaction.csv", header=True, schema=fraud_transaction_schema)
fraud_transaction_df.show(5)
fraud_transaction_df.printSchema()

+-----------+-------------+-------------+--------------------+
|category_id|   cat_level1|   cat_level2|          cat_level3|
+-----------+-------------+-------------+--------------------+
|          1|Personal Care|       Makeup|             Compact|
|          2|   Free Items|   Free Gifts|                Ties|
|          3|     Footwear|        Shoes|        Casual Shoes|
|          4|Personal Care|Bath and Body| Body Wash and Scrub|
|          5|Personal Care|    Skin Care|Face Wash and Cle...|
+-----------+-------------+-------------+--------------------+
only showing top 5 rows

root
 |-- category_id: integer (nullable = true)
 |-- cat_level1: string (nullable = true)
 |-- cat_level2: string (nullable = true)
 |-- cat_level3: string (nullable = true)

+-----------+----------+-----------+--------------------+--------------------+------+----------+---------------+
|customer_id|first_name|  last_name|            username|               email|gender| birthdate|first_join_date|
+-----

### 3. Using the Kafka topics from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.


In [5]:
kafka_brokers = "kafka:9092"
browsing_topic = "browsing_behavior"
transactions_topic = "transactions"

# Define schema for browsing and transactions datasets (initially everything is string except ts)
browsing_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", StringType(), True),  # To be converted to timestamp later
    StructField("traffic_source", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("customer_id", StringType(), True),  # Initially as StringType
    StructField("ts", IntegerType(), True)  # ts as IntegerType
])

transactions_schema = StructType([
    StructField("created_at", StringType(), True),  # To be converted to timestamp later
    StructField("customer_id", StringType(), True),  # Initially as StringType
    StructField("transaction_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", StringType(), True),  # Initially as StringType
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", StringType(), True),  # Initially as StringType
    StructField("shipment_location_lat", StringType(), True),  # Initially as StringType
    StructField("shipment_location_long", StringType(), True),  # Initially as StringType
    StructField("total_amount", StringType(), True),  # Initially as StringType
    StructField("clear_payment", StringType(), True),
])

# Read browsing and transactions data from Kafka
browsing_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", browsing_topic) \
    .load()

transactions_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", transactions_topic) \
    .load()

# Convert Kafka data from bytes to JSON and apply schema
browsing_stream = browsing_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .withColumn("data", from_json(col("value"), browsing_schema)) \
    .select("data.*")

transactions_stream = transactions_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .withColumn("data", from_json(col("value"), transactions_schema)) \
    .select("data.*")

browsing_stream.printSchema()
transactions_stream.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- ts: integer (nullable = true)

root
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: string (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- shipment_fee: string (nullable = true)
 |-- shipment_location_lat: string (nullable = true)
 |-- shipment_location_long: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- clear_payment: string (nullable = true)



In [7]:
query_browsing = browsing_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("QueryBrowsing") \
    .start()

In [8]:
spark.sql("SELECT * FROM QueryBrowsing").show()

+--------------------+----------+--------------------+--------------+-----------+-----------+----------+
|          session_id|event_type|          event_time|traffic_source|device_type|customer_id|        ts|
+--------------------+----------+--------------------+--------------+-----------+-----------+----------+
|8a4ea821-02ea-427...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|      61923|1729214848|
|e224a60c-f02c-4e1...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|      52328|1729214848|
|8083bc82-d495-463...|        VI|2024-01-01 00:00:...|        MOBILE|        iOS|      33854|1729214848|
|3d4fc2ed-23b3-41b...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|       1030|1729214848|
|29cab3eb-17b3-472...|        VP|2024-01-01 00:00:...|           WEB|    Android|      12525|1729214848|
|7aaab404-e148-484...|        HP|2024-01-01 00:00:...|        MOBILE|    Android|      53754|1729214848|
|75666d59-bfd7-42b...|       SCR|2024-01-01 00:00:...| 

In [9]:
query_browsing.stop()

In [10]:
query_transactions = transactions_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("QueryTransactions") \
    .start()

In [12]:
spark.sql("SELECT * FROM QueryTransactions").show()

+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+---------------------+----------------------+------------+-------------+
|          created_at|customer_id|      transaction_id|          session_id|    product_metadata|payment_method|payment_status|promo_amount|   promo_code|shipment_fee|shipment_location_lat|shipment_location_long|total_amount|clear_payment|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+---------------------+----------------------+------------+-------------+
|2024-01-01 00:01:...|      98123|97b193ec-7b52-40d...|f85f2f9f-df4a-4ae...|[{'product_id': 9...|           OVO|          Fail|           0|             |           0|    -6.17197281657169|      106.790424765641|      271986|            0|
|2024-01-01 00:03:...|      64835|6d7eb6

In [13]:
query_transactions.stop()

### 4.	Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A. Perform the following tasks:  
a)	For the 'ts' column, convert it to the timestamp format, we will use it as event_ts.  
b)	If the data is late for more than 2 minutes, discard it.  


In [14]:
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import to_timestamp

# Convert 'ts' column (in Unix timestamp) to timestamp and filter late data (older than 2 minutes)
browsing_stream = browsing_stream.withColumn("event_ts", from_unixtime(col("ts")).cast("timestamp")) \
    .filter("event_ts >= current_timestamp() - interval 2 minutes")

# Cast the columns to DoubleType
transactions_stream = transactions_stream \
    .withColumn("promo_amount", col("promo_amount").cast("double")) \
    .withColumn("shipment_fee", col("shipment_fee").cast("double")) \
    .withColumn("shipment_location_lat", col("shipment_location_lat").cast("double")) \
    .withColumn("shipment_location_long", col("shipment_location_long").cast("double")) \
    .withColumn("total_amount", col("total_amount").cast("double"))

browsing_stream.printSchema()
transactions_stream.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- event_ts: timestamp (nullable = true)

root
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: double (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- shipment_fee: double (nullable = true)
 |-- shipment_location_lat: double (nullable = true)
 |-- shipment_location_long: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- clear_payment: string (nullable = true)



In [13]:
query_browsing = browsing_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("QueryBrowsing") \
    .start()

In [14]:
spark.sql("SELECT * FROM QueryBrowsing").show()

+--------------------+----------+--------------------+--------------+-----------+-----------+----------+-------------------+
|          session_id|event_type|          event_time|traffic_source|device_type|customer_id|        ts|           event_ts|
+--------------------+----------+--------------------+--------------+-----------+-----------+----------+-------------------+
|8a4ea821-02ea-427...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|      61923|1728640644|2024-10-11 20:57:24|
|e224a60c-f02c-4e1...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|      52328|1728640644|2024-10-11 20:57:24|
|8083bc82-d495-463...|        VI|2024-01-01 00:00:...|        MOBILE|        iOS|      33854|1728640644|2024-10-11 20:57:24|
|3d4fc2ed-23b3-41b...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|       1030|1728640644|2024-10-11 20:57:24|
|29cab3eb-17b3-472...|        VP|2024-01-01 00:00:...|           WEB|    Android|      12525|1728640644|2024-10-11 20:57:24|


In [15]:
query_browsing.stop()

In [16]:
query_transactions = transactions_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("QueryTransactions") \
    .start()

In [17]:
spark.sql("SELECT * FROM QueryTransactions").show()

+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+---------------------+----------------------+------------+-------------+
|          created_at|customer_id|      transaction_id|          session_id|    product_metadata|payment_method|payment_status|promo_amount|   promo_code|shipment_fee|shipment_location_lat|shipment_location_long|total_amount|clear_payment|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+---------------------+----------------------+------------+-------------+
|2024-01-01 00:01:...|      98123|97b193ec-7b52-40d...|f85f2f9f-df4a-4ae...|[{'product_id': 9...|           OVO|          Fail|         0.0|             |         0.0|    -6.17197281657169|      106.790424765641|    271986.0|            0|
|2024-01-01 00:03:...|      64835|6d7eb6

In [18]:
query_transactions.stop()

### 5.	Aggregate the streaming data frames and create features we used in the model.  
(note: customer ID has already been included in the stream.) Then, join the static data frames with the streaming data frame as our final data for prediction. Perform data type/column conversion according to your ML model and print out the Schema. (Again, you can reuse code from A2A).

In [16]:
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, DoubleType

# Define schema for product metadata (array of products)
product_metadata_schema = ArrayType(StructType([
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("item_price", DoubleType(), True)
]))

# 1. Drop the customer_id from the browsing stream
browsing_joined = browsing_stream.drop("customer_id")

# 2. Parse the product metadata field in the transactions stream
transactions_stream = transactions_stream.withColumn(
    "product_list", from_json(col("product_metadata"), product_metadata_schema)
)

# 3. Explode the product list to create one row per product
transactions_exploded = transactions_stream.withColumn("product", explode(col("product_list")))

# 4. Extract fields from exploded product metadata
transactions_exploded = transactions_exploded \
    .withColumn("product_id", col("product.product_id")) \
    .withColumn("quantity", col("product.quantity")) \
    .withColumn("item_price", col("product.item_price"))

# 5. Drop redundant columns like product_list and product
transactions_cleaned = transactions_exploded.drop("product_list", "product")

# 6. Join the cleaned transactions stream with the product_df on product_id
transactions_joined = transactions_cleaned \
    .join(product_df.withColumnRenamed("id", "product_id"), "product_id", "left")

# Print schema to verify the joined transactions
transactions_joined.printSchema()
browsing_joined.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: double (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- shipment_fee: double (nullable = true)
 |-- shipment_location_lat: double (nullable = true)
 |-- shipment_location_long: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- clear_payment: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- item_price: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- baseColour: string (nullable = true)
 |-- season: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- usage: string (nullable = true)
 |-- productDisplayName: string (nulla

In [20]:
query_browsing = browsing_joined.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("QueryBrowsing") \
    .start()

In [21]:
spark.sql("SELECT * FROM QueryBrowsing").show()

+--------------------+----------+--------------------+--------------+-----------+----------+-------------------+
|          session_id|event_type|          event_time|traffic_source|device_type|        ts|           event_ts|
+--------------------+----------+--------------------+--------------+-----------+----------+-------------------+
|8a4ea821-02ea-427...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|1728640677|2024-10-11 20:57:57|
|e224a60c-f02c-4e1...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|1728640677|2024-10-11 20:57:57|
|8083bc82-d495-463...|        VI|2024-01-01 00:00:...|        MOBILE|        iOS|1728640677|2024-10-11 20:57:57|
|3d4fc2ed-23b3-41b...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|1728640677|2024-10-11 20:57:57|
|29cab3eb-17b3-472...|        VP|2024-01-01 00:00:...|           WEB|    Android|1728640677|2024-10-11 20:57:57|
|7aaab404-e148-484...|        HP|2024-01-01 00:00:...|        MOBILE|    Android|1728640677|2024

In [22]:
query_browsing.stop()

In [23]:
query_transactions = transactions_joined.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("QueryTransactions") \
    .start()

In [24]:
spark.sql("SELECT * FROM QueryTransactions").show()

+----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+---------------------+----------------------+------------+-------------+--------+----------+------+----------+------+----+------+--------------------+-----------+
|product_id|          created_at|customer_id|      transaction_id|          session_id|    product_metadata|payment_method|payment_status|promo_amount|   promo_code|shipment_fee|shipment_location_lat|shipment_location_long|total_amount|clear_payment|quantity|item_price|gender|baseColour|season|year| usage|  productDisplayName|category_id|
+----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+---------------------+----------------------+------------+-------------+--------+----------+------+----------+------+----+------+-----------

In [25]:
query_transactions.stop()

#### Explanation of Converting Stream-Stream or Static-Stream Join to Static-Static Join using Mini-Batches

The code effectively converts a stream-stream or static-stream join into a static-static join by using mini-batches stored in intermediate Parquet files. Here's a breakdown of how this works:

##### 1. Writing Streaming Data as Mini-Batches:
The `browsing_joined` and `transactions_joined` streams are written to disk in Parquet format with a trigger set to every minute (`processingTime="1 minute"`). This means that every minute, the incoming stream data is stored as static mini-batches in the directories `browsing_data` and `transactions_data`.

##### 2. Reading the Mini-Batches as Static Data:
After the streams have written the data to disk, we then read the stored mini-batches from Parquet files as static DataFrames.

In [17]:
# Browsing stream write to disk 
browsing_stream_query = browsing_joined.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "browsing_data") \
    .trigger(processingTime="30 seconds") \
    .start()

# Transactions stream write to disk with checkpointing
transactions_stream_query = transactions_joined.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "transactions_data") \
    .trigger(processingTime="30 seconds") \
    .start()

In [18]:
# Read browsing mini-batches as static data
browsing_behaviour_df = spark.read.parquet("browsing_data")

# Read transactions mini-batches as static data
transaction_df = spark.read.parquet("transactions_data")

In [19]:
browsing_behaviour_df.show()  # Check if data exists
transaction_df.show()  # Check if data exists

+--------------------+----------+--------------------+--------------+-----------+----------+-------------------+
|          session_id|event_type|          event_time|traffic_source|device_type|        ts|           event_ts|
+--------------------+----------+--------------------+--------------+-----------+----------+-------------------+
|8a4ea821-02ea-427...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|1728814800|2024-10-13 21:20:00|
|e224a60c-f02c-4e1...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|1728814800|2024-10-13 21:20:00|
|8083bc82-d495-463...|        VI|2024-01-01 00:00:...|        MOBILE|        iOS|1728814800|2024-10-13 21:20:00|
|3d4fc2ed-23b3-41b...|        CL|2024-01-01 00:00:...|        MOBILE|    Android|1728814800|2024-10-13 21:20:00|
|29cab3eb-17b3-472...|        VP|2024-01-01 00:00:...|           WEB|    Android|1728814800|2024-10-13 21:20:00|
|7aaab404-e148-484...|        HP|2024-01-01 00:00:...|        MOBILE|    Android|1728814800|2024

In [20]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Categorizing events into levels
browsing_behaviour_df = browsing_behaviour_df.withColumn(
    "event_level",
    F.when(F.col("event_type").isin(["AP", "ATC", "CO"]), "L1")
    .when(F.col("event_type").isin(["VC", "VP", "VI", "SER"]), "L2")
    .when(F.col("event_type").isin(["SCR", "HP", "CL"]), "L3")
    .otherwise("Other")
)

# Counting the number of actions in each level for each session
level_counts = browsing_behaviour_df.groupBy("session_id").pivot("event_level").count().na.fill(0)

# Renaming columns for clarity
feature_df = level_counts.withColumnRenamed("L1", "L1_count") \
                         .withColumnRenamed("L2", "L2_count") \
                         .withColumnRenamed("L3", "L3_count")

# Selecting only the required columns: transaction_id, session_id, L1_count, L2_count, L3_count
feature_df = feature_df.select("session_id", "L1_count", "L2_count", "L3_count")

feature_df.show()

+--------------------+--------+--------+--------+
|          session_id|L1_count|L2_count|L3_count|
+--------------------+--------+--------+--------+
|64e5d3a2-e9b4-49e...|       0|     210|       0|
|593a54c9-8642-4b7...|       0|     256|       0|
|7c479942-2f31-454...|     118|     139|       0|
|5afd5fe8-c9bf-4ba...|       0|       0|     190|
|7e552a85-8e47-4b8...|       0|       0|     256|
|274041ed-c121-468...|       0|     242|       0|
|7621faed-7db2-47f...|       0|       0|      58|
|1fda6d20-d4cb-48c...|       0|       0|     256|
|5d768487-5d97-4a2...|       0|       0|      66|
|ede8725e-fe86-444...|       0|       0|     171|
|ea6ec7f8-2c2e-40e...|     101|       0|       0|
|e6b20437-b49c-485...|       0|       0|     256|
|58623af3-5f8e-43b...|       0|       0|     256|
|c3104e9e-ea3d-4a4...|       0|       0|     256|
|d311c334-0b01-471...|       0|     256|     282|
|3ea3a7d8-109d-492...|       0|       0|     256|
|8b37dfd9-548c-4fa...|       0|       0|     256|


In [21]:
from pyspark.sql import functions as F

# Calculating event ratios
feature_df = feature_df.withColumn(
    "L1_ratio",
    F.round((F.col("L1_count") / (F.col("L1_count") + F.col("L2_count") + F.col("L3_count")) * 100), 2)
).withColumn(
    "L2_ratio",
    F.round((F.col("L2_count") / (F.col("L1_count") + F.col("L2_count") + F.col("L3_count")) * 100), 2)
)

feature_df.show()

+--------------------+--------+--------+--------+--------+--------+
|          session_id|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|
+--------------------+--------+--------+--------+--------+--------+
|64e5d3a2-e9b4-49e...|       0|     210|       0|     0.0|   100.0|
|593a54c9-8642-4b7...|       0|     256|       0|     0.0|   100.0|
|7c479942-2f31-454...|     118|     139|       0|   45.91|   54.09|
|5afd5fe8-c9bf-4ba...|       0|       0|     190|     0.0|     0.0|
|7e552a85-8e47-4b8...|       0|       0|     256|     0.0|     0.0|
|274041ed-c121-468...|       0|     242|       0|     0.0|   100.0|
|7621faed-7db2-47f...|       0|       0|      58|     0.0|     0.0|
|1fda6d20-d4cb-48c...|       0|       0|     256|     0.0|     0.0|
|5d768487-5d97-4a2...|       0|       0|      66|     0.0|     0.0|
|ede8725e-fe86-444...|       0|       0|     171|     0.0|     0.0|
|ea6ec7f8-2c2e-40e...|     101|       0|       0|   100.0|     0.0|
|e6b20437-b49c-485...|       0|       0|     256

In [22]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Ensuring event_time is in timestamp format
browsing_behaviour_df = browsing_behaviour_df.withColumn("event_time", F.to_timestamp("event_time"))

# Calculating the median time for each session
# We are approximating median by taking the midpoint between min and max times within each session
median_time_df = browsing_behaviour_df.groupBy("session_id").agg(
    ((F.unix_timestamp(F.max("event_time")) + F.unix_timestamp(F.min("event_time"))) / 2).cast("timestamp").alias("medium_time")
)

# Defining a function and UDF to classify the time of day
def classify_time_of_day(hour):
    if 6 <= hour < 12:
        return "morning"
    elif 12 <= hour < 18:
        return "afternoon"
    elif 18 <= hour < 24:
        return "evening"
    else:
        return "night"

# Registering the function as a UDF
time_of_day_udf = F.udf(lambda x: classify_time_of_day(x.hour), StringType())

# Joining the median time with the main feature DataFrame and classify time of day
try:
    # Check if the column 'time_of_day' already exists
    if "time_of_day" not in feature_df.columns:
        # Perform the join operation
        feature_df = feature_df.join(median_time_df, "session_id", "left")
        # Add the 'time_of_day' column
        feature_df = feature_df.withColumn("time_of_day", time_of_day_udf(F.col("medium_time")))
except Exception as e:
    print(f"An error occurred during the join or while adding 'time_of_day' column: {e}")

feature_df.show()

+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+
|          session_id|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|         medium_time|time_of_day|
+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+
|64e5d3a2-e9b4-49e...|       0|     210|       0|     0.0|   100.0| 2024-01-01 00:59:19|  afternoon|
|593a54c9-8642-4b7...|       0|     256|       0|     0.0|   100.0| 2024-01-01 00:39:31|  afternoon|
|7c479942-2f31-454...|     118|     139|       0|   45.91|   54.09| 2024-01-01 01:17:28|  afternoon|
|5afd5fe8-c9bf-4ba...|       0|       0|     190|     0.0|     0.0| 2024-01-01 01:04:44|  afternoon|
|7e552a85-8e47-4b8...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:11:40|  afternoon|
|274041ed-c121-468...|       0|     242|       0|     0.0|   100.0| 2024-01-01 00:54:46|  afternoon|
|7621faed-7db2-47f...|       0|       0|      58|     0.0|     0.0| 2024-01-01 01:33:20|  a

In [23]:
from pyspark.sql import functions as F

# Extracting customer_id from browsing sessions and add to feature_df
try:
    if "geolocation" not in feature_df.columns:
        
        # Joining with customer_session to get customer_id
        customer_session_df = customer_session_df.withColumnRenamed("session_id", "session_id_link")
        feature_df = feature_df.join(customer_session_df, feature_df.session_id == customer_session_df.session_id_link, "left") \
                               .drop("session_id_link")
        
        # Extracting first join year and calculate age from customer DataFrame
        current_year = F.year(F.current_date())
        customer_df = customer_df.withColumn("first_join_year", F.year("first_join_date").cast("string")) \
                                 .withColumn("birth_year", F.year("birthdate")) \
                                 .withColumn("age", F.round(current_year - F.col("birth_year")).cast("integer"))
        
        # Joining customer information with feature_df using customer_id
        feature_df = feature_df.join(customer_df.select("customer_id", "gender", "age", "first_join_year"), 
                                     on="customer_id", 
                                     how="left")
        
        # Joining geolocation information from transaction_df using session_id
        geolocation_df = transaction_df.select("session_id", "shipment_location_lat", "shipment_location_long") \
                                       .withColumnRenamed("shipment_location_lat", "latitude") \
                                       .withColumnRenamed("shipment_location_long", "longitude")
        
        # Combining latitude and longitude into a single geolocation column
        geolocation_df = geolocation_df.withColumn("geolocation", 
                                                   F.concat(F.lit("("), 
                                                            F.col("latitude").cast("string"), 
                                                            F.lit(", "), 
                                                            F.col("longitude").cast("string"), 
                                                            F.lit(")")))
        
        # Joining the geolocation column with feature_df
        feature_df = feature_df.join(geolocation_df.select("session_id", "geolocation"), on="session_id", how="left")
        
        # Dropping the customer_id column from feature_df
        feature_df = feature_df.drop("customer_id")

except Exception as e:
    print(f"An error occurred during the joining process or column manipulation: {e}")

feature_df.show()

+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+
|          session_id|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|         medium_time|time_of_day|gender|age|first_join_year|         geolocation|
+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+
|1fda6d20-d4cb-48c...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:26:16|  afternoon|     F| 29|           2021|                NULL|
|274041ed-c121-468...|       0|     242|       0|     0.0|   100.0| 2024-01-01 00:54:46|  afternoon|     F| 22|           2021|                NULL|
|3ea3a7d8-109d-492...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:31:36|  afternoon|     F| 33|           2019|                NULL|
|5302b653-3ad2-481...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:15:04|  afternoon|     M

In [24]:
from pyspark.sql import functions as F

# Extracting customer_id from browsing sessions and add to feature_df
try:
    if "num_purchases" not in feature_df.columns:
        
        # Joining with customer_session to get customer_id
        customer_session_df = customer_session_df.withColumnRenamed("session_id", "session_id_link")
        feature_df = feature_df.join(customer_session_df, feature_df.session_id == customer_session_df.session_id_link, "left") \
                               .drop("session_id_link")

        # Counting the number of purchases per customer using transactions_df
        purchase_counts = transaction_df.groupBy("customer_id").agg(
            F.count("transaction_id").alias("num_purchases")
        )

        # Joining the purchase count with feature_df using customer_id
        feature_df = feature_df.join(purchase_counts, on="customer_id", how="left")

        # Filling any missing values in num_purchases with 0 (for customers with no recorded purchases)
        feature_df = feature_df.fillna({"num_purchases": 0})

        # Dropping the customer_id column from feature_df
        feature_df = feature_df.drop("customer_id")

except Exception as e:
    print(f"An error occurred during the join or while manipulating 'num_purchases': {e}")

feature_df.show()

+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+-------------+
|          session_id|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|         medium_time|time_of_day|gender|age|first_join_year|         geolocation|num_purchases|
+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+-------------+
|1fda6d20-d4cb-48c...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:26:16|  afternoon|     F| 29|           2021|                NULL|            0|
|274041ed-c121-468...|       0|     242|       0|     0.0|   100.0| 2024-01-01 00:54:46|  afternoon|     F| 22|           2021|                NULL|            0|
|3ea3a7d8-109d-492...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:31:36|  afternoon|     F| 33|           2019|                NULL|            0|
|5302b653-3ad2-481...|

In [25]:
from pyspark.sql import functions as F

# Attaching fraud labels to transactions and join with feature_df
try:
    if "is_fraud" not in feature_df.columns:
        
        # Joining transactions_df with fraud_transaction_df to attach fraud labels
        transactions_with_fraud = transaction_df.join(fraud_transaction_df, "transaction_id", "left") \
                                                .withColumn("is_fraud", F.when(F.col("is_fraud").isNull(), False).otherwise(True))

        # Joining transactions_with_fraud with feature_df using session_id
        feature_df = feature_df.join(transactions_with_fraud.select("session_id", "is_fraud").distinct(), 
                                     on="session_id", 
                                     how="left")

except Exception as e:
    print(f"An error occurred during the join or while attaching the 'is_fraud' label: {e}")

feature_df.show()

+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+-------------+--------+
|          session_id|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|         medium_time|time_of_day|gender|age|first_join_year|         geolocation|num_purchases|is_fraud|
+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+-------------+--------+
|1fda6d20-d4cb-48c...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:26:16|  afternoon|     F| 29|           2021|                NULL|            0|    NULL|
|274041ed-c121-468...|       0|     242|       0|     0.0|   100.0| 2024-01-01 00:54:46|  afternoon|     F| 22|           2021|                NULL|            0|    NULL|
|3ea3a7d8-109d-492...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:31:36|  afternoon|     F| 33|           2019|             

In [26]:
from pyspark.sql.functions import expr

# Converting Boolean to Integer for Label Column
feature_df = feature_df.withColumn("is_fraud_int", expr("CASE WHEN is_fraud = 'True' THEN 1 ELSE 0 END"))
feature_df.show()

+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+-------------+--------+------------+
|          session_id|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|         medium_time|time_of_day|gender|age|first_join_year|         geolocation|num_purchases|is_fraud|is_fraud_int|
+--------------------+--------+--------+--------+--------+--------+--------------------+-----------+------+---+---------------+--------------------+-------------+--------+------------+
|1fda6d20-d4cb-48c...|       0|       0|     256|     0.0|     0.0| 2024-01-01 00:26:16|  afternoon|     F| 29|           2021|                NULL|            0|    NULL|           0|
|274041ed-c121-468...|       0|     242|       0|     0.0|   100.0| 2024-01-01 00:54:46|  afternoon|     F| 22|           2021|                NULL|            0|    NULL|           0|
|3ea3a7d8-109d-492...|       0|       0|     256|     0.0|     0.0| 2024-01

In [27]:
# Add event_ts column from browsing_behaviour_df to feature_df
feature_df = feature_df.join(
    browsing_behaviour_df.select("session_id", "ts", "event_ts"),
    on="session_id",
    how="left"
)

# Show the final DataFrame
feature_df.show()

+--------------------+--------+--------+--------+--------+--------+-------------------+-----------+------+---+---------------+-----------+-------------+--------+------------+----------+-------------------+
|          session_id|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|        medium_time|time_of_day|gender|age|first_join_year|geolocation|num_purchases|is_fraud|is_fraud_int|        ts|           event_ts|
+--------------------+--------+--------+--------+--------+--------+-------------------+-----------+------+---+---------------+-----------+-------------+--------+------------+----------+-------------------+
|1fda6d20-d4cb-48c...|       0|       0|     256|     0.0|     0.0|2024-01-01 00:26:16|  afternoon|     F| 29|           2021|       NULL|            0|    NULL|           0|1728813960|2024-10-13 21:06:00|
|1fda6d20-d4cb-48c...|       0|       0|     256|     0.0|     0.0|2024-01-01 00:26:16|  afternoon|     F| 29|           2021|       NULL|            0|    NULL|           0|17

In [28]:
feature_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- L1_count: long (nullable = true)
 |-- L2_count: long (nullable = true)
 |-- L3_count: long (nullable = true)
 |-- L1_ratio: double (nullable = true)
 |-- L2_ratio: double (nullable = true)
 |-- medium_time: timestamp (nullable = true)
 |-- time_of_day: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- first_join_year: string (nullable = true)
 |-- geolocation: string (nullable = true)
 |-- num_purchases: long (nullable = false)
 |-- is_fraud: boolean (nullable = true)
 |-- is_fraud_int: integer (nullable = false)
 |-- ts: integer (nullable = true)
 |-- event_ts: timestamp (nullable = true)



### 6.	The company is interested in the number of potential frauds as they happen and the products in customers’ shopping carts (so that they can plan their stock level ahead.) Load your ML model, and use the model to predict/process each browsing session/transaction as follows:  
a)	Every 10 seconds, show the total number of potential frauds (prediction = 1) in the last 2 minutes, and persist the raw data (see 7a).  
b)	Every 30 seconds, find the top 20 products (order by quantity descending) in the last 30 seconds, show product ID, name and total quantity. We only need the non-fraud transactions (prediction=0) by extracting customer shopping cart details (sum of all items of ADD_TO_CART(ATC) events from browsing behaviour, you can also extract it from transactions).

In [29]:
# Load the pre-trained Random Forest model
model_path = "best_rf_model"
loaded_rf_model = PipelineModel.load(model_path)

# Example to show how to use it in streaming
predictions = loaded_rf_model.transform(feature_df)

In [30]:
# 6a
from pyspark.sql.functions import col, window, count

# Filtering for potential frauds (prediction = 1)
fraud_df = predictions.filter(col('rf_prediction') == 1)

# Show total number of potential frauds per session_id every 10 seconds in the last 2 minutes
fraud_count = fraud_df \
    .groupBy("session_id", window(col("event_ts"), "2 minutes", "10 seconds")) \
    .count() \
    .select("session_id", "window", "count")

In [31]:
# Show the fraud count 
fraud_count.show(truncate=False)

+------------------------------------+------------------------------------------+-----+
|session_id                          |window                                    |count|
+------------------------------------+------------------------------------------+-----+
|135d09b6-2ee3-40e2-b08f-f7fb61fbf480|{2024-10-13 21:50:00, 2024-10-13 21:52:00}|5    |
|206852b6-dc65-438f-8aae-11dd2c40de50|{2024-10-13 21:22:00, 2024-10-13 21:24:00}|2    |
|206852b6-dc65-438f-8aae-11dd2c40de50|{2024-10-13 21:20:00, 2024-10-13 21:22:00}|4    |
|24aeaee0-4a0e-4c41-ab1e-d98f98323058|{2024-10-13 21:22:00, 2024-10-13 21:24:00}|186  |
|24aeaee0-4a0e-4c41-ab1e-d98f98323058|{2024-10-13 21:20:50, 2024-10-13 21:22:50}|62   |
|24aeaee0-4a0e-4c41-ab1e-d98f98323058|{2024-10-13 21:13:30, 2024-10-13 21:15:30}|310  |
|470a2012-8e1c-4d30-93a0-97c18dffb4d1|{2024-10-13 21:25:10, 2024-10-13 21:27:10}|40   |
|71af3c94-6edb-4b26-8eee-6e67a500911d|{2024-10-13 21:26:10, 2024-10-13 21:28:10}|205  |
|71af3c94-6edb-4b26-8eee-6e67a50

In [32]:
# 6b
from pyspark.sql.functions import col, sum

# Filter non-fraud transactions (where prediction = 0)
non_fraud_transactions = predictions.filter(col('rf_prediction') == 0)

# Join with transactions_joined on session_id, keeping only product details from transactions_joined
non_fraud_with_product_details = non_fraud_transactions.join(
    transaction_df.select("session_id", "product_id", "productDisplayName", "quantity"),  # Ensuring product_id is included
    on="session_id",
    how="inner"
)

# Join browsing behavior to get the event_type
non_fraud_with_product_details = non_fraud_with_product_details.join(
    browsing_behaviour_df.select("session_id", "event_type"),  # Ensure session_id and event_type are included
    on="session_id",
    how="inner"
)

# Ensure 'event_ts' is properly handled, renaming it to avoid ambiguity if necessary
non_fraud_with_product_details = non_fraud_with_product_details.withColumnRenamed("event_ts", "non_fraud_event_ts")

# Extract product details for each ADD_TO_CART (ATC) event
product_sales = non_fraud_with_product_details.filter(col('event_type') == 'ATC')

# Group by distinct product_id and productDisplayName for each 30-second window
top_products = product_sales.groupBy(window(col("non_fraud_event_ts"), "30 seconds"), "product_id", "productDisplayName") \
    .agg(sum("quantity").alias("total_quantity")) \
    .orderBy(col("total_quantity").desc()) \
    .select("product_id", "productDisplayName", "total_quantity") \
    .limit(20)  # Limit to top 20 products

# Print the schema to verify the structure
top_products.printSchema()


root
 |-- product_id: integer (nullable = true)
 |-- productDisplayName: string (nullable = true)
 |-- total_quantity: long (nullable = true)



In [33]:
# Show the Top Products
top_products.show(truncate=False)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

### 7.	Write a Parquet file and save the following data frames (tip: you may look at part 3 and think about what columns to save):  
a.	Persist the raw data from 6a in parquet format. Every student may have different features/columns in their data frames depending on their model, at the bare minimum, we need some IDs to identify those frauds later on (transaction_id and/or session_id). After that, read the parquet file and show a few rows to verify it is saved correctly.  
b.	Persist the data from 6b in another parquet file.  

In [34]:
# 7a

# Persist fraud_count data to Parquet
fraud_count.write \
    .format("parquet") \
    .mode("append") \
    .option("path", "fraud_predictions_count_parquet") \
    .save()


In [44]:
# Read the Parquet data
fraud_count_df = spark.read \
    .format("parquet") \
    .option("path", "fraud_predictions_count_parquet") \
    .load()

# Show the data 
fraud_count_df.show()

+--------------------+--------------------+-----+
|          session_id|              window|count|
+--------------------+--------------------+-----+
|aee50500-e67d-4cf...|{2024-10-11 20:57...|   16|
|b8086f32-d838-4c6...|{2024-10-11 20:58...|    8|
|e29be054-e160-455...|{2024-10-11 20:57...|    2|
|135d09b6-2ee3-40e...|{2024-10-11 20:58...|    4|
|206852b6-dc65-438...|{2024-10-11 20:58...|    1|
|7b373982-d715-484...|{2024-10-11 20:57...|   16|
|b2fb3286-6414-402...|{2024-10-11 20:57...|   12|
|470a2012-8e1c-4d3...|{2024-10-11 20:59...|    1|
|f0f5264d-e26b-4af...|{2024-10-11 20:58...|    1|
|206852b6-dc65-438...|{2024-10-11 20:57...|    1|
|d1af6233-eecf-498...|{2024-10-11 20:58...|   16|
|0e11c7c3-8cb0-43d...|{2024-10-11 20:57...|   16|
|204678d8-a714-415...|{2024-10-11 20:57...|    2|
|e46bc807-be6c-439...|{2024-10-11 20:58...|    5|
|3a737027-89d4-461...|{2024-10-11 20:58...|   16|
|235c6de7-9548-4d8...|{2024-10-11 20:58...|    3|
|f85f2f9f-df4a-4ae...|{2024-10-11 20:58...|   16|


In [45]:
# 7b

# Persist fraud_count data to Parquet
top_products.write \
    .format("parquet") \
    .mode("append") \
    .option("path", "top_products_parquet") \
    .save()


In [46]:
# Read the Parquet data
top_products_df = spark.read \
    .format("parquet") \
    .option("path", "top_products_parquet") \
    .load()

# Show the data (optional)
top_products_df.show()

+----------+--------------------+--------------+
|product_id|  productDisplayName|total_quantity|
+----------+--------------------+--------------+
|      3231|Puma Men's Speed ...|          4288|
|     11895|Franco Leone Men ...|          2736|
|     11895|Franco Leone Men ...|           624|
|      3231|Puma Men's Speed ...|            64|
|     40996|Gini and Jony Gir...|            14|
+----------+--------------------+--------------+



### 8.	Read the two parquet files from task 7 as data streams and send to Kafka topics with appropriate names.
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet file.)

In [47]:
# Stream 1

In [62]:
# Step 1: Infer schema by loading parquet data statically
static_fraud_df = spark.read \
    .format("parquet") \
    .option("path", "fraud_predictions_count_parquet") \
    .load()

# Print the inferred schema
static_fraud_df.printSchema()

# Step 2: Use this schema for the streaming dataframe
fraud_schema = static_fraud_df.schema


root
 |-- session_id: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = true)



In [63]:
from pyspark.sql.functions import to_json, struct

# Use the schema inferred from the static DataFrame
fraud_count_stream_df = spark.readStream \
    .format("parquet") \
    .schema(fraud_schema) \
    .option("path", "fraud_predictions_count_parquet") \
    .load()

# Transform the DataFrame into a Kafka-friendly format (JSON)
fraud_count_kafka_df = fraud_count_stream_df \
    .select(to_json(struct("session_id", "window", "count")).alias("value"))

# Write the data to Kafka topic "fraud_count_topic"
fraud_count_kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "fraud_count_topic") \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0xffff626d5540>

In [65]:
# Memory sink to view the fraud_count stream in-memory
fraud_count_query = fraud_count_kafka_df.writeStream \
    .format("memory") \
    .queryName("fraud_count_in_memory") \
    .outputMode("append") \
    .start()

In [66]:
# Query the in-memory table to see the streaming data
fraud_count_data = spark.sql("SELECT * FROM fraud_count_in_memory")
fraud_count_data.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                    |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"session_id":"aee50500-e67d-4cf3-8b70-983dea9a4ea6","window":{"start":"2024-10-11T20:57:40.000+11:00","end":"2024-10-11T20:59:40.000+11:00"},"count":16}|
|{"session_id":"b8086f32-d838-4c65-81d6-3fa44d7e6545","window":{"start":"2024-10-11T20:58:50.000+11:00","end":"2024-10-11T21:00:50.000+11:00"},"count":8} |
|{"session_id":"e29be054-e160-4550-b2aa-d9bb0f1924e4","window":{"start":"2024-10-11T20:57:00.000+11:00","end":"2024-10-11T20:59:00.000+11:00"},"count":2} |
|{"session_id":"135d09b6-2ee3-40e2-b08f-f7fb61fbf480","window":{

In [67]:
fraud_count_query.stop()

In [68]:
# Stream 2

In [69]:
# Step 1: Infer schema by loading parquet data statically
static_top_products_df = spark.read \
    .format("parquet") \
    .option("path", "top_products_parquet") \
    .load()

# Print the inferred schema
static_top_products_df.printSchema()

# Step 2: Use this schema for the streaming dataframe
top_products_schema = static_top_products_df.schema

root
 |-- product_id: integer (nullable = true)
 |-- productDisplayName: string (nullable = true)
 |-- total_quantity: long (nullable = true)



In [70]:
# Use the schema inferred from the static DataFrame
top_products_stream_df = spark.readStream \
    .format("parquet") \
    .schema(top_products_schema) \
    .option("path", "top_products_parquet") \
    .load()

# Transform the DataFrame into a Kafka-friendly format (JSON)
top_products_kafka_df = top_products_stream_df \
    .select(to_json(struct("product_id", "productDisplayName", "total_quantity")).alias("value"))

# Write the data to Kafka topic "top_products_topic"
top_products_kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "top_products_topic") \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0xffff61dcdd50>

In [71]:
# Memory sink to view the top_products stream in-memory
top_products_query = top_products_kafka_df.writeStream \
    .format("memory") \
    .queryName("top_products_in_memory") \
    .outputMode("append") \
    .start()

In [72]:
# Query the in-memory table to see the streaming data
top_products_data = spark.sql("SELECT * FROM top_products_in_memory")
top_products_data.show(truncate=False)

+------------------------------------------------------------------------------------------------------------+
|value                                                                                                       |
+------------------------------------------------------------------------------------------------------------+
|{"product_id":3231,"productDisplayName":"Puma Men's Speed Cat White Green Shoe","total_quantity":4288}      |
|{"product_id":11895,"productDisplayName":"Franco Leone Men Formal Black Formal Shoes","total_quantity":2736}|
|{"product_id":11895,"productDisplayName":"Franco Leone Men Formal Black Formal Shoes","total_quantity":624} |
|{"product_id":3231,"productDisplayName":"Puma Men's Speed Cat White Green Shoe","total_quantity":64}        |
|{"product_id":40996,"productDisplayName":"Gini and Jony Girls Woven Pink Pedal Pusher","total_quantity":14} |
+------------------------------------------------------------------------------------------------------------+



In [73]:
top_products_query.stop()