# Part 2: Streaming application using Spark Structured Streaming

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'
import time
import shutil
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import *

# Spark and PySpark imports
from pyspark.sql import SparkSession
from pyspark import SparkConf

from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, FloatType, 
    TimestampType, BooleanType, DoubleType
)
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, when, count, sum, min, max, mean, hour, expr, udf
)

# Spark and PySpark imports
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, FloatType, 
    TimestampType, BooleanType, DoubleType
)
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, when, count, sum, min, max, mean, hour, expr, udf
)
from pyspark.sql.functions import from_unixtime, col
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.utils import AnalysisException

from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import col, from_json
from pyspark.sql import functions as F
# PySpark MLlib imports
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.functions import vector_to_array



### 1.	Write code to create a SparkSession, which 1) uses four cores with a proper application name; 2) use the Melbourne timezone; 3) ensure a checkpoint location has been set.


In [2]:
# Define the checkpoint location
checkpoint_location = "../A2B"

# Create a SparkSession
spark = SparkSession.builder.appName("Assignment 2B part 2")\
        .master("local[4]")\
        .config("spark.sql.session.timeZone", "Australia/Melbourne")\
        .getOrCreate()

# Set the checkpoint directory
spark.sparkContext.setCheckpointDir(checkpoint_location)

# Example usage
print("Spark Session created successfully!")

Spark Session created successfully!


### 2.	Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. (You can use your code from 2A.)



In [3]:
# Define the schema for the customer dataset
customer_schema = StructType([
    StructField("customer_id", FloatType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", TimestampType(), True),
    StructField("first_join_date", TimestampType(), True)
])

# Define the schema for the category dataset
category_schema = StructType([
    StructField("category_id", FloatType(), False),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

# Define the schema for the browsing behaviour dataset
browsing_behaviour_schema = StructType([
    StructField("session_id", StringType(), False),
    StructField("event_type", StringType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("ts", IntegerType(), True)
])

# Define the schema for the product dataset
product_schema = StructType([
    StructField("id", StringType(), False),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", FloatType(), True)
])

# Product metadata schema
product_metadata_schema = ArrayType(
    StructType([
        StructField("product_id", IntegerType()),
        StructField("quantity", IntegerType()),
        StructField("item_price", IntegerType())
    ])
)



# Define the schema for the customer session dataset
customer_session_schema = StructType([
    StructField("session_id", StringType(), False),
    StructField("customer_id", FloatType(), False)  
])

# Define the schema for the fraud transaction dataset
fraud_transaction_schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("is_fraud", BooleanType(), False)
])

In [4]:
# Example usage to create DataFrames with the defined schemas
df_category = spark.read.csv("category.csv", schema=category_schema, header = True)
df_customer = spark.read.csv("customer.csv", schema=customer_schema, header = True)
df_product = spark.read.csv("product.csv", schema=product_schema, header = True)

### 3. Using the Kafka topics from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.


## Browsing

In [5]:
# Monitor the logs data stream for new log data
trans_topic_name = 'trans_topic'
browsing_topic_name = 'browsing_behaviour_topic'

# Read the stream from Kafka
# Browsing
df_browsing_urls = spark.readStream.format("kafka")\
                        .option("kafka.bootstrap.servers", "kafka:9092")\
                        .option("subscribe", browsing_topic_name)\
                        .load()


# Get value of the kafka message
browsing_lines = df_browsing_urls.selectExpr("CAST(value AS STRING)")
# Apply the schema to the DataFrame by parsing the JSON
df_browsing_behaviour = browsing_lines.select(
    from_json(col("value"), browsing_behaviour_schema).alias("browsing_value")
).select("browsing_value.*")  # Expand the struct to get individual fields

In [6]:
df_browsing_behaviour.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- ts: integer (nullable = true)



In [7]:
# Print the stream for browsing behaviour
def foreach_batch_function(df, epoch_id):
    df.show(5,False)
df_browsing_behaviour_query = df_browsing_behaviour.writeStream.outputMode("append")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

+----------+----------+----------+--------------+-----------+---+
|session_id|event_type|event_time|traffic_source|device_type|ts |
+----------+----------+----------+--------------+-----------+---+
+----------+----------+----------+--------------+-----------+---+

+------------------------------------+----------+-----------------------+--------------+-----------+----------+
|session_id                          |event_type|event_time             |traffic_source|device_type|ts        |
+------------------------------------+----------+-----------------------+--------------+-----------+----------+
|6691e6b4-6558-4cd1-b3e3-2b078edf5919|SCR       |2024-05-09 10:01:43.864|MOBILE        |Android    |1728818397|
|e441523d-4464-4e52-9a4e-86c07c877332|CO        |2024-05-09 10:01:50.943|MOBILE        |Android    |1728818397|
|a42e2c54-c770-49fd-8754-bed1eeea235c|VP        |2024-05-09 10:01:54.151|MOBILE        |Android    |1728818397|
|b4182d6b-d547-4585-a275-49495792b24a|CO        |2024-05-09 10:

In [8]:
df_browsing_behaviour_query.stop()

## Transaction

In [9]:
# Transaction
df_trans_urls = spark.readStream.format("kafka")\
                        .option("kafka.bootstrap.servers", "kafka:9092")\
                        .option("subscribe", trans_topic_name)\
                        .load()
# Optional: Display the schema of the DataFrame
df_trans_urls.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [10]:
from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.functions import col

# Cast the Kafka key and value as string
df_kafka_values = df_trans_urls.select(col("key").cast("string"), col("value").cast("string"))

# Apply regex to extract each field from the value (assumed to be JSON-like)
df_transactions = df_kafka_values.withColumn("created_at", regexp_extract(col("value"), '"created_at":\s*"([^"]+)"', 1))\
    .withColumn("customer_id", regexp_extract(col("value"), '"customer_id":\s*"([^"]+)"', 1))\
    .withColumn("transaction_id", regexp_extract(col("value"), '"transaction_id":\s*"([^"]+)"', 1))\
    .withColumn("session_id", regexp_extract(col("value"), '"session_id":\s*"([^"]+)"', 1))\
    .withColumn("product_metadata", regexp_extract(col("value"), '"product_metadata":\s*"\[([^"]+)\]"', 1))\
    .withColumn("payment_method", regexp_extract(col("value"), '"payment_method":\s*"([^"]+)"', 1))\
    .withColumn("payment_status", regexp_extract(col("value"), '"payment_status":\s*"([^"]+)"', 1))\
    .withColumn("promo_amount", regexp_extract(col("value"), '"promo_amount":\s*"([^"]+)"', 1))\
    .withColumn("promo_code", regexp_extract(col("value"), '"promo_code":\s*"([^"]+)"', 1))\
    .withColumn("shipment_fee", regexp_extract(col("value"), '"shipment_fee":\s*"([^"]+)"', 1))\
    .withColumn("shipment_location_lat", regexp_extract(col("value"), '"shipment_location_lat":\s*"([^"]+)"', 1))\
    .withColumn("shipment_location_long", regexp_extract(col("value"), '"shipment_location_long":\s*"([^"]+)"', 1))\
    .withColumn("total_amount", regexp_extract(col("value"), '"total_amount":\s*"([^"]+)"', 1))\
    .withColumn("clear_payment", regexp_extract(col("value"), '"clear_payment":\s*"([^"]+)"', 1))\
    .withColumn("trans_ts", regexp_extract(col("value"), '"trans_ts":\s*([0-9]+)', 1))

# Select the key and extracted fields
df_transactions = df_transactions.select(
    col("created_at"),
    col("customer_id"),
    col("transaction_id"),
    col("session_id"),
    col("product_metadata"),
    col("payment_method"),
    col("payment_status"),
    col("promo_amount"),
    col("promo_code"),
    col("shipment_fee"),
    col("shipment_location_lat"),
    col("shipment_location_long"),
    col("total_amount"),
    col("clear_payment"),
    col("trans_ts")
)



In [11]:
# Print the stream for transaction
query_trans = df_transactions.writeStream.outputMode("append")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

+----------+-----------+--------------+----------+----------------+--------------+--------------+------------+----------+------------+---------------------+----------------------+------------+-------------+--------+
|created_at|customer_id|transaction_id|session_id|product_metadata|payment_method|payment_status|promo_amount|promo_code|shipment_fee|shipment_location_lat|shipment_location_long|total_amount|clear_payment|trans_ts|
+----------+-----------+--------------+----------+----------------+--------------+--------------+------------+----------+------------+---------------------+----------------------+------------+-------------+--------+
+----------+-----------+--------------+----------+----------------+--------------+--------------+------------+----------+------------+---------------------+----------------------+------------+-------------+--------+

+-----------------------+-----------+------------------------------------+------------------------------------+------------------------

In [12]:
query_trans.stop()

### 4.	Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A. Perform the following tasks:  
a)	For the 'ts' column, convert it to the timestamp format, we will use it as event_ts.  
b)	If the data is late for more than 2 minutes, discard it.  


In [13]:
# Get event_ts from ts columns with datatype format
df_browsing_behaviour_watermark = df_browsing_behaviour.withColumn("event_ts", F.from_unixtime(F.col('ts').cast(IntegerType())).cast(TimestampType())) \
    .drop("ts")  # Drop the original 'ts' column if not needed

df_browsing_behaviour_watermark = df_browsing_behaviour_watermark.withColumn("event_hour", hour(col("event_time")))


df_browsing_behaviour_watermark = df_browsing_behaviour_watermark.withWatermark('event_ts', "2 minutes")
df_browsing_behaviour_watermark.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- event_hour: integer (nullable = true)



In [14]:
# Print the stream for behaviour watermark streamline
df_browsing_behaviour_watermark_query = df_browsing_behaviour_watermark \
    .writeStream.outputMode("append") \
    .foreachBatch(foreach_batch_function) \
    .trigger(processingTime='5 seconds') \
    .start()

+------------------------------------+----------+-----------------------+--------------+-----------+-------------------+----------+
|session_id                          |event_type|event_time             |traffic_source|device_type|event_ts           |event_hour|
+------------------------------------+----------+-----------------------+--------------+-----------+-------------------+----------+
|fd702ad0-fb4b-426c-b5cf-374b94e03561|HP        |2024-05-10 05:02:38.522|WEB           |Android    |2024-10-13 22:20:19|5         |
|e552c3db-5dfb-4743-b84c-b1043fbcb283|HP        |2024-05-10 05:02:40.27 |MOBILE        |iOS        |2024-10-13 22:20:19|5         |
|b2679c57-2a0b-4b5e-b0b5-af885a8cefb6|ATC       |2024-05-10 05:02:48.239|MOBILE        |iOS        |2024-10-13 22:20:19|5         |
|400997c3-0dfb-435a-a4f1-55e3e82dcc49|HP        |2024-05-10 05:02:48.38 |MOBILE        |iOS        |2024-10-13 22:20:19|5         |
|8dd00e4f-57e8-4b99-b94c-6b737fcb90a8|HP        |2024-05-10 05:02:57.122|MOB

In [15]:
df_browsing_behaviour_watermark_query.stop()

In [16]:
# Convert data type for transactions
df_transactions = df_transactions \
    .withColumn('created_at', F.col('created_at').cast(TimestampType())) \
    .withColumn('customer_id', F.col('customer_id').cast(IntegerType())) \
    .withColumn('promo_amount', F.col('promo_amount').cast(DoubleType())) \
    .withColumn('shipment_fee', F.col('shipment_fee').cast(DoubleType())) \
    .withColumn('shipment_location_lat', F.col('shipment_location_lat').cast(DoubleType())) \
    .withColumn('shipment_location_long', F.col('shipment_location_long').cast(DoubleType())) \
    .withColumn('total_amount', F.col('total_amount').cast(IntegerType())) \
    .withColumn('clear_payment', F.col('clear_payment').cast(IntegerType())) \
    .withColumn(
        "product_metadata",
        from_json(col("product_metadata"), product_metadata_schema)
    )\
    .withColumn('trans_ts', F.col('trans_ts').cast(IntegerType())) \
    .withColumn("event_trans_ts",from_unixtime(col('trans_ts').cast(IntegerType())).cast(TimestampType())) \
                                 .drop("trans_ts")

In [17]:
# Print the stream for transaction after converting data type
trans_query = df_transactions \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function) \
    .trigger(processingTime='5 seconds') \
    .start()

+----------+-----------+--------------+----------+----------------+--------------+--------------+------------+----------+------------+---------------------+----------------------+------------+-------------+--------------+
|created_at|customer_id|transaction_id|session_id|product_metadata|payment_method|payment_status|promo_amount|promo_code|shipment_fee|shipment_location_lat|shipment_location_long|total_amount|clear_payment|event_trans_ts|
+----------+-----------+--------------+----------+----------------+--------------+--------------+------------+----------+------------+---------------------+----------------------+------------+-------------+--------------+
+----------+-----------+--------------+----------+----------------+--------------+--------------+------------+----------+------------+---------------------+----------------------+------------+-------------+--------------+

+-----------------------+-----------+------------------------------------+------------------------------------+

In [18]:
trans_query.stop()

### 5.	Aggregate the streaming data frames and create features you used in your assignment 2A model.  
(note: customer ID has already been included in the stream.) Then, join the static data frames with the streaming data frame as our final data for prediction. Perform data type/column conversion according to your ML model and print out the Schema. (Again, you can reuse code from A2A).

In [19]:
from pyspark.sql.functions import col, count, when, min, max, hour, udf
from pyspark.sql.types import StringType, TimestampType, IntegerType, DoubleType
from pyspark.sql import functions as F

# Define the action levels
L1_actions = {"AP", "ATC", "CO"}
L2_actions = {"VC", "VP", "VI", "SER"}
L3_actions = {"SCR", "HP", "CL"}

# Define the UDF to categorize actions into each level
def categorize_action(event_type):
    if event_type in L1_actions:
        return "L1"
    elif event_type in L2_actions:
        return "L2"
    elif event_type in L3_actions:
        return "L3"
    else:
        return "Unknown"

categorize_action_udf = udf(categorize_action, StringType())

# Register UDF for time of day categorization
def time_of_day(hour):
    if 6 <= hour < 12:
        return 'morning'
    elif 12 <= hour < 18:
        return 'afternoon'
    elif 18 <= hour < 24:
        return 'evening'
    else:
        return 'night'

time_of_day_udf = udf(time_of_day, StringType())


In [20]:
transaction_schema_2A = StructType([
    StructField("created_at", TimestampType(), True),
    StructField("customer_id", StringType(), True), 
    StructField("transaction_id", StringType(), False),
    StructField("session_id", StringType(), True),
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", FloatType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", FloatType(), True),
    StructField("shipment_location_lat", FloatType(), True),
    StructField("shipment_location_long", FloatType(), True),
    StructField("total_amount", FloatType(), True),
    StructField("clear_payment", BooleanType(), True)
])


customer_trans_LTV_2A = spark.read.csv("transactions.csv", schema=transaction_schema_2A, header = True)
customer_trans_agg_2A = customer_trans_LTV_2A.groupBy("customer_id").agg(
    # Transaction-related aggregations
    count("transaction_id").alias("num_trans"),
    
    # Count successful and failed transactions based on payment_status
    count(when(col("payment_status") == "Success", True)).alias("num_success_trans"),
    count(when(col("payment_status") == "Fail", True)).alias("num_failed_trans")
)


current_year = F.year(F.current_date())
# Calculate age (round to integer) and first_join_year in the same step
df_customer = df_customer.withColumn("age", F.round(current_year - F.year("birthdate"))) \
                         .withColumn("first_join_year", F.year("first_join_date"))

In [21]:
customer_trans_agg_2A.show(5)

+-----------+---------+-----------------+----------------+
|customer_id|num_trans|num_success_trans|num_failed_trans|
+-----------+---------+-----------------+----------------+
|      95519|       52|               42|              10|
|      28426|       73|               62|              11|
|      62481|       33|               27|               6|
|      27108|       39|               28|              11|
|      51244|      102|               74|              28|
+-----------+---------+-----------------+----------------+
only showing top 5 rows



In [22]:
feature_df = (
    df_browsing_behaviour_watermark
    # Categorize action levels and time of day
    .withColumn("action_level", categorize_action_udf(df_browsing_behaviour["event_type"]))
    .withColumn("event_hour", hour(col("event_time")))
    .withColumn("time_of_day", time_of_day_udf(col("event_hour")))
    .withColumn("event_hour", hour(col("event_time"))) 
    # Group by window and session_id and aggregate counts for each action level
    .groupBy(F.window("event_ts", "2 minutes", "10 seconds"), "session_id")
    .agg(
        F.count(F.when(F.col("action_level") == "L1", 1)).alias("L1_count"),
        F.count(F.when(F.col("action_level") == "L2", 1)).alias("L2_count"),
        F.count(F.when(F.col("action_level") == "L3", 1)).alias("L3_count"),
        min("event_hour").alias("min_hour"),
        max("event_hour").alias("max_hour")
    )

    # Add total_actions and ratios for L1 and L2
    .withColumn("total_actions", col("L1_count") + col("L2_count") + col("L3_count"))
    .withColumn("L1_ratio", (col("L1_count") / col("total_actions")) * 100)
    .withColumn("L2_ratio", (col("L2_count") / col("total_actions")) * 100)
    .drop("total_actions")
    .withColumn("medium_hour", (col("min_hour") + col("max_hour")) / 2)
    .withColumn("time_of_day_medium", time_of_day_udf(col("medium_hour")))
    .join(
        df_transactions.select("session_id", "payment_status","promo_code", "customer_id", "product_metadata","total_amount", "promo_amount", "shipment_fee", "shipment_location_lat", "shipment_location_long","payment_method"),
        on="session_id",
        how="inner"
    )
    .join(
        df_customer.select("customer_id", "gender","first_join_year", "age"),
        on="customer_id",
        how="inner"
    )
    .join(customer_trans_agg_2A, on="customer_id", how="left")
    .drop("min_hour", "max_hour", "medium_hour")
    .na.drop()
)

In [23]:
from pyspark.sql.functions import col

# Define a dictionary with the required column data types
required_dtypes = {
    "customer_id": "float",
    "session_id": "string",
    "payment_method": "string", # 15
    "payment_status": "string",
    "promo_amount": "float", # 2
    "promo_code": "string",
    "shipment_fee": "float", # 3
    "shipment_location_lat": "float", # 4
    "shipment_location_long": "float", # 5
    "total_amount": "float", # 1
    "L1_count": "long", # 8
    "L2_count": "long", # 9
    "L3_count": "long", # 10
    "L1_ratio": "double", # 11
    "L2_ratio": "double", # 12
    "time_of_day_medium": "string", # 14
    "gender": "string",
    "age": "int", # 6
    "first_join_year": "int", # 7
    "num_trans":"long", # 13
    "num_success_trans": "long",
    "num_failed_trans": "long"
} 
# Cast each column to the correct data type
for col_name, dtype in required_dtypes.items():
    feature_df = feature_df.withColumn(col_name, col(col_name).cast(dtype))

# Display the schema to verify the data types
feature_df.printSchema()

root
 |-- customer_id: float (nullable = true)
 |-- session_id: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- L1_count: long (nullable = false)
 |-- L2_count: long (nullable = false)
 |-- L3_count: long (nullable = false)
 |-- L1_ratio: double (nullable = true)
 |-- L2_ratio: double (nullable = true)
 |-- time_of_day_medium: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- product_metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- item_price: integer (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- promo_amount: float (nullable = true)
 |-- shipment_fee: float (nullable = true)
 |-- shipment_location_lat: float (nullable = true)
 |-- shipment_loc

In [24]:
feature_session = feature_df \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function) \
    .trigger(processingTime='5 seconds') \
    .start()

+-----------+----------+------+--------+--------+--------+--------+--------+------------------+--------------+----------+----------------+------------+------------+------------+---------------------+----------------------+--------------+------+---------------+---+---------+-----------------+----------------+
|customer_id|session_id|window|L1_count|L2_count|L3_count|L1_ratio|L2_ratio|time_of_day_medium|payment_status|promo_code|product_metadata|total_amount|promo_amount|shipment_fee|shipment_location_lat|shipment_location_long|payment_method|gender|first_join_year|age|num_trans|num_success_trans|num_failed_trans|
+-----------+----------+------+--------+--------+--------+--------+--------+------------------+--------------+----------+----------------+------------+------------+------------+---------------------+----------------------+--------------+------+---------------+---+---------+-----------------+----------------+
+-----------+----------+------+--------+--------+--------+--------+---

In [25]:
feature_session.stop()

### 6.	The company is interested in the number of potential frauds as they happen and the products in customers’ shopping carts (so that they can plan their stock level ahead.) Load your ML model, and use the model to predict/process each browsing session/transaction as follows:  
a)	Every 10 seconds, show the total number of potential frauds (prediction = 1) in the last 2 minutes, and persist the raw data (see 7a).  
b)	Every 30 seconds, find the top 20 products (order by quantity descending) in the last 30 seconds, show product ID, name and total quantity. We only need the non-fraud transactions (prediction=0) by extracting customer shopping cart details (sum of all items of ADD_TO_CART(ATC) events from browsing behaviour, you can also extract it from transactions).

In [26]:
from pyspark.ml import PipelineModel  # Use PipelineModel if the model is part of a pipeline


# Path where the model was saved (relative to the current directory)
model_path = "best_model"  # This is your model path

# Load the model using PipelineModel or just a generic model
try:
    loaded_model = PipelineModel.load(model_path)  
    print("Model loaded successfully.")
except Exception as e:
    print(f"Error loading model: {e}")

Model loaded successfully.


In [27]:
# Use the loaded model to make predictions
try:
    predictions = loaded_model.transform(feature_df)  # Use your DataFrame here
    print("Predictions successfully. Below is schema of prediction")
    predictions.printSchema()
except Exception as e:
    print(f"Error during prediction: {e}")

Predictions successfully. Below is schema of prediction
root
 |-- customer_id: float (nullable = true)
 |-- session_id: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- L1_count: long (nullable = false)
 |-- L2_count: long (nullable = false)
 |-- L3_count: long (nullable = false)
 |-- L1_ratio: double (nullable = true)
 |-- L2_ratio: double (nullable = true)
 |-- time_of_day_medium: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- product_metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- item_price: integer (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- promo_amount: float (nullable = true)
 |-- shipment_fee: float (nullable = true)
 |-- shipment

In [28]:
try:
    # Define the path to the parquet directory
    parquet_directory_path = "./parquet"

    # Remove the entire parquet directory if it exists
    if os.path.exists(parquet_directory_path):
        shutil.rmtree(parquet_directory_path)

    # Checkpoint directory path
    checkpoint_path = "./parquet/predictions/checkpoint"

    print("Deleted parquet folder and created checkpoint path successfully.")  # Fixed closing parenthesis

except Exception as e:  # Catch all exceptions and assign to variable e
    print(f"An error occurred: {e}")  # Print the specific error message


Deleted parquet folder and created checkpoint path successfully.


In [29]:
fraud_stream = predictions \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "./parquet/predictions") \
    .option("checkpointLocation", checkpoint_path) \
    .start()

In [30]:
# fraud_stream.stop()

In [31]:
read_predictions_parquet = spark.readStream \
    .schema(predictions.schema) \
    .format("parquet") \
    .load("./parquet/predictions")

read_predictions_parquet.printSchema()

root
 |-- customer_id: float (nullable = true)
 |-- session_id: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- L1_count: long (nullable = true)
 |-- L2_count: long (nullable = true)
 |-- L3_count: long (nullable = true)
 |-- L1_ratio: double (nullable = true)
 |-- L2_ratio: double (nullable = true)
 |-- time_of_day_medium: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- product_metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- item_price: integer (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- promo_amount: float (nullable = true)
 |-- shipment_fee: float (nullable = true)
 |-- shipment_location_lat: float (nullable = true)
 |-- shipment_locati

In [32]:
def print_batch_count(batch_df, batch_id):
    # Check if the batch DataFrame is empty
    if batch_df.isEmpty():
        print(f"Batch ID: {batch_id} - No records in this batch.")
    else:
        count = batch_df.count()
        fraud_count = batch_df.filter(col("prediction") == 1).count()
        print(f"Batch {batch_id} has {count} records with total potential frauds is: {fraud_count}.")

    
read_predictions_parquet_query = read_predictions_parquet \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .foreachBatch(print_batch_count) \
    .start()

Batch ID: 0 - No records in this batch.
Batch ID: 1 - No records in this batch.
Batch ID: 2 - No records in this batch.
Batch ID: 3 - No records in this batch.
Batch ID: 4 - No records in this batch.
Batch ID: 5 - No records in this batch.
Batch 6 has 1853 records with total potential frauds is: 5.
Batch 7 has 14813 records with total potential frauds is: 40.
Batch 8 has 14272 records with total potential frauds is: 35.


In [33]:
read_predictions_parquet_query.stop()

In [34]:
#6b
# Assuming read_predictions_parquet already has the correct window structure
non_fraud_products_df = (
    read_predictions_parquet
    .filter(F.col("prediction") == 0)  # Filter for non-fraud transactions
    .withColumn("exploded_product_data", F.explode("product_metadata"))  # Explode product metadata
    .select(
        F.col("exploded_product_data.product_id").alias("product_id"),
        F.col("exploded_product_data.quantity").alias("quantity"),
        F.col("window.start").alias("window_start")  # Add the window start for clarity
    )
    .join(
        df_product.select(F.col("id"), F.col("productDisplayName").alias("product_name")),
        F.col("product_id") == df_product.id
    )
    .drop("id")
    .withWatermark("window_start", "2 minutes")  # Closing parenthesis added here
    .groupBy(
        F.window("window_start", "30 seconds"),  # Group by the window.start for 30 seconds
        "product_id",
        "product_name"
    )
    .agg(F.sum("quantity").alias("total_quantity"))  # Sum the quantities
)
# Print the schema of the resulting DataFrame
non_fraud_products_df.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- total_quantity: long (nullable = true)



In [35]:
from pyspark.sql.window import Window
def print_top_product(batch_df, batch_id):
    if batch_df.isEmpty():
        print(f"Batch ID: {batch_id} - No records in this batch.")
    else:
        batch_df \
        .orderBy(F.desc("total_quantity")) \
        .select("product_id", "product_name", "total_quantity") \
        .show(20, False)

In [50]:
non_fraud_products_query = non_fraud_products_df \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .foreachBatch(print_top_product) \
    .start()

In [37]:
non_fraud_products_query.stop()

### 7.	Write a Parquet file and save the following data frames (tip: you may look at part 3 and think about what columns to save):  
a.	Persist the raw data from 6a in parquet format. Every student may have different features/columns in their data frames depending on their model, at the bare minimum, we need some IDs to identify those frauds later on (transaction_id and/or session_id). After that, read the parquet file and show a few rows to verify it is saved correctly.  
b.	Persist the data from 6b in another parquet file.  

In [38]:
# 7a
potential_fraud = read_predictions_parquet.filter(F.col("prediction") == 1)
potential_fraud_dectect_query = potential_fraud \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "./parquet/potential_fraud_folder") \
    .option("checkpointLocation", "./parquet/potential_fraud_folder/checkpoint") \
    .start()

In [42]:
try:
    potential_fraud_7a_df = spark.read.parquet("./parquet/potential_fraud_folder")
    potential_fraud_7a_df.show(5)
except AnalysisException as e:
    print("Please re-run this after few mins.")

+-----------+--------------------+--------------------+--------+--------+--------+-----------------+-----------------+------------------+--------------+----------+--------------------+------------+------------+------------+---------------------+----------------------+--------------+------+---------------+---+---------+-----------------+----------------+------------------------+--------------------+----------------------+------------------+--------------------+--------------------+--------------------+----------+
|customer_id|          session_id|              window|L1_count|L2_count|L3_count|         L1_ratio|         L2_ratio|time_of_day_medium|payment_status|promo_code|    product_metadata|total_amount|promo_amount|shipment_fee|shipment_location_lat|shipment_location_long|payment_method|gender|first_join_year|age|num_trans|num_success_trans|num_failed_trans|time_of_day_medium_index|payment_method_index|time_of_day_medium_vec|payment_method_vec|            features|       rawPredicti

In [None]:
# potential_fraud_dectect_query.stop()

In [43]:
# 7b
top_product_7b_query = non_fraud_products_df \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "./parquet/top_product_non_fraud") \
    .option("checkpointLocation", "./parquet/top_product_non_fraud/checkpoint") \
    .start()

In [49]:
try:
    top_product_non_fraud_df = spark.read.parquet("./parquet/top_product_non_fraud")
    top_product_non_fraud_df.show(5)
except AnalysisException as e:
    print("Please re-run this after few mins.")

+--------------------+----------+--------------------+--------------+
|              window|product_id|        product_name|total_quantity|
+--------------------+----------+--------------------+--------------+
|{2024-10-13 22:25...|      2978|Puma Women's Punk...|             9|
|{2024-10-13 22:25...|     34136|Turtle Men Leathe...|             2|
|{2024-10-13 22:24...|     13517|Chimp Men Dracull...|             4|
|{2024-10-13 22:25...|     49152|Deborah Extra Lip...|            15|
|{2024-10-13 22:26...|     20234|Wrangler Women Ba...|             3|
+--------------------+----------+--------------------+--------------+
only showing top 5 rows



In [None]:
# top_product_7b_query.stop()

### 8.	Read the two parquet files from task 7 as data streams and send to Kafka topics with appropriate names.
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet file.)

In [47]:
# Stream 1
potential_fraud_streamline = spark.readStream \
    .format("parquet") \
    .schema(potential_fraud.schema) \
    .option("path", "./parquet/potential_fraud_folder") \
    .load()

# Sending messages to Kafka topic 'potential_fraud_topic'
potential_fraud_query = potential_fraud_streamline \
    .selectExpr("to_json(struct(*)) as value") \
    .writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "potential_fraud_topic") \
    .option("checkpointLocation", "./parquet/potential_fraud_folder/kafka_checkpoint") \
    .start()

In [None]:
potential_fraud_query.stop()

In [48]:
# Stream 2
top_product_non_fraud_streamline = spark.readStream \
    .format("parquet") \
    .option("path", "./parquet/top_product_non_fraud") \
    .schema(non_fraud_products_df.schema) \
    .load()

# Sending messages to Kafka topic 'top_product_fraud_topic'
top_product_non_fraud_query = top_product_non_fraud_streamline \
    .selectExpr("to_json(struct(*)) as value") \
    .writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "top_product_non_fraud_topic") \
    .option("checkpointLocation", "./parquet/top_product_non_fraud/kafka_checkpoint") \
    .start()

In [None]:
top_product_non_fraud_query.stop()