# Streaming application using Spark Structured Streaming

### 1. Write code to create a SparkSession, which uses four cores with a proper application name, use the Melbourne timezone, and make sure a checkpoint location has been set.


In [10]:
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

spark = SparkSession.builder \
    .appName("A2B") \
    .master("local[4]") \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .getOrCreate()

base_path = os.path.abspath('.')

checkpoint_directory = os.path.join(base_path, 'checkpoint_directory')
os.makedirs(checkpoint_directory, exist_ok=True)

spark.sparkContext.setCheckpointDir(checkpoint_directory)

### 2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. 



In [11]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType, TimestampType

customer_schema = StructType([
    StructField("#", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", DateType(), True),
    StructField("device_type", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("device_version", StringType(), True),
    StructField("home_location_lat", FloatType(), True),
    StructField("home_location_long", FloatType(), True),
    StructField("home_location", StringType(), True),
    StructField("home_country", StringType(), True),
    StructField("first_join_date", StringType(), True)
])

category_schema = StructType([
    StructField("#", IntegerType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

click_stream_schema = StructType([
    StructField("#", IntegerType(), True),
    StructField("session_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("event_id", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("event_metadata", StringType(), True)
])

product_schema = StructType([
    StructField("#", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", StringType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", IntegerType(), True)
])

transaction_schema = StructType([
    StructField("#", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("booking_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", FloatType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", FloatType(), True),
    StructField("shipment_date_limit", DateType(), True),
    StructField("shipment_location_lat", StringType(), True),
    StructField("shipment_location_long", StringType(), True),
    StructField("total_amount", FloatType(), True)
])

path_customer = "A2A/dataset/customer.csv"
path_category = "A2A/dataset/category.csv"
path_click_stream = "A2A/dataset/click_stream.csv"
path_customer_session = "A2A/dataset/customer_session.csv"
path_product = "A2A/dataset/product.csv"
path_transaction = "A2A/dataset/transactions.csv"

df_customer = spark.read.csv(path_customer, schema=customer_schema, header=True)
df_category = spark.read.csv(path_category, schema=category_schema, header=True)
df_click_stream = spark.read.csv(path_click_stream, schema=click_stream_schema, header=True)
df_customer_session = spark.read.csv(path_customer_session, header=True)
df_product = spark.read.csv(path_product, schema=product_schema, header=True)
df_transaction = spark.read.csv(path_transaction, schema=transaction_schema, header=True)


### 3 Using the Kafka topic from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.




In [12]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import *

hostip = "192.168.0.3"
topic = "click_stream_topic"

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f"{hostip}:9092") \
    .option("subscribe", topic) \
    .load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


### 4 Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.  
Perform the following tasks:  

a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  

In [13]:
from pyspark.sql.functions import to_timestamp

schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_id", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("event_metadata", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("ts", IntegerType(), True)
])

df = df.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))

df = df.select(
    col("parsed_value.session_id").alias("session_id"),
    col("parsed_value.event_name").alias("event_name"),
    col("parsed_value.event_id").alias("event_id"),
    col("parsed_value.traffic_source").alias("traffic_source"),
    col("parsed_value.event_metadata").alias("event_metadata"),
    col("parsed_value.customer_id").alias("customer_id"),
    col("parsed_value.ts").cast(TimestampType()).alias("event_time")
)

b) If the data is late for more than 1 minute, discard it.  

In [14]:
df_with_watermark = df.withWatermark("event_time", "60 seconds")

### 5  Feature Creation
Aggregate the streaming data frame by session id and create features you used in your assignment 2A model. (note: customer ID has already been included in the stream.)   
Then, join the static data frames with the streaming data frame as our final data for prediction.  
Perform data type/column conversion according to your ML model, and print out the Schema.


In [15]:
from pyspark.sql import functions as F
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType
from datetime import datetime
from pyspark.sql.functions import month, dayofmonth, datediff, current_date, to_date, col

# Define the categories based on event types
high_value_actions = ['PURCHASE', 'ADD_PROMO', 'ADD_TO_CART']
medium_value_actions = ['VIEW_PROMO', 'VIEW_ITEM', 'SEARCH']
low_value_actions = ['SCROLL', 'HOMEPAGE', 'CLICK']

# Categorize events based on their type
df_categorized = df_click_stream.withColumn(
    "category",
    F.when(F.col("event_name").isin(high_value_actions), "num_cat_highvalue")
    .when(F.col("event_name").isin(medium_value_actions), "num_cat_midvalue")
    .otherwise("num_cat_lowvalue")
)

df_category_counts = df_categorized.groupBy("session_id").pivot("category").count()

expected_columns = ["num_cat_highvalue", "num_cat_midvalue", "num_cat_lowvalue"]
for column in expected_columns:
    if column not in df_category_counts.columns:
        df_category_counts = df_category_counts.withColumn(column, F.lit(0))
        
feature_df = df_category_counts.na.fill({
    "num_cat_highvalue": 0,
    "num_cat_midvalue": 0,
    "num_cat_lowvalue": 0
})

total_counts = df_category_counts.agg(
    F.sum("num_cat_highvalue").alias("Total High Value Actions"),
    F.sum("num_cat_midvalue").alias("Total Medium Value Actions"),
    F.sum("num_cat_lowvalue").alias("Total Low Value Actions")
)

df_promotion = df_click_stream.withColumn(
    "is_promotion",
    F.when(F.col("event_name") == "ADD_PROMO", 1).otherwise(0)
)

df_promotion_summary = df_promotion.groupBy("session_id").agg(
    F.max("is_promotion").alias("is_promotion")
)

feature_df = feature_df.join(df_promotion_summary, on="session_id", how="left_outer")

feature_df = feature_df.na.fill({"is_promotion": 0})

def get_season(month):
    if 3 <= month <= 5:
        return "Spring"
    elif 6 <= month <= 8:
        return "Summer"
    elif 9 <= month <= 11:
        return "Autumn"
    elif month in [12, 1, 2]:
        return "Winter"

season_udf = F.udf(get_season, StringType())

df_click_stream = df_click_stream.withColumn(
    "month", F.month("event_time")
).withColumn(
    "season", season_udf("month")
)

df_season = df_click_stream.groupBy("session_id").agg(
    F.max("season").alias("season")  
)

df_customer = df_customer.withColumn("first_join_date", F.to_date(F.col("first_join_date")))
df_customer = df_customer.withColumn("first_join_year", F.year(F.col("first_join_date")).cast("integer"))

current_year = datetime.now().year
current_date = datetime.now()
df_customer = df_customer.withColumn("birthdate", F.to_date(col("birthdate"), "dd-MM-yyyy"))
df_customer = df_customer.withColumn("age", (F.datediff(F.current_date(), col("birthdate")) / 365.25).cast("integer"))

df_customer_after = df_customer.select(
    "customer_id", "gender", "age", "device_type", "home_location", "first_join_year"
)

df_customer_after_01 = df_customer_session.join(df_customer_after, "customer_id")
feature_df = feature_df.join(df_customer_after_01, "session_id")

df_transaction = df_transaction.withColumn("purchase", F.when(F.col("payment_status") == "Success", 1).otherwise(0))

df_transaction = df_transaction.select("session_id", "customer_id", "purchase")

feature_df = feature_df.join(df_transaction, on=["session_id", "customer_id"], how="left")

feature_df = feature_df.na.fill({"purchase": 0})

feature_df = feature_df.join(df_season,"session_id")

required_columns = [
    "session_id", "customer_id",
    "num_cat_highvalue", "num_cat_midvalue", "num_cat_lowvalue",
    "is_promotion",
    "season", "gender", "age", "device_type", "home_location", "first_join_year",
    "purchase"  
]

for column in required_columns:
    if column not in feature_df.columns:
        feature_df = feature_df.withColumn(column, F.lit(None))

default_values = {
    "gender": "Unknown", "age": -1, "device_type": "Unknown",
    "home_location": "Unknown", "first_join_year": -1
}
feature_df = feature_df.na.fill(default_values)

df_final = feature_df.join(df_with_watermark, ["session_id", "customer_id"])

df_final.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- num_cat_highvalue: long (nullable = false)
 |-- num_cat_lowvalue: long (nullable = false)
 |-- num_cat_midvalue: long (nullable = false)
 |-- is_promotion: integer (nullable = false)
 |-- #: string (nullable = true)
 |-- gender: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- device_type: string (nullable = false)
 |-- home_location: string (nullable = false)
 |-- first_join_year: integer (nullable = false)
 |-- purchase: integer (nullable = false)
 |-- season: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



### 6 Load your ML model, and use the model to predict if a purchase is made or not in each session. Persist the prediction result in parquet format, then read the parquet result and show the results.

In [16]:
from pyspark.ml.pipeline import PipelineModel

model_path = "A2A/Moth_model" 
model = PipelineModel.load(model_path)

predictions = model.transform(df_final)

predictions_1 = predictions.select("rawPrediction", "probability", "prediction")

output_path = "checkpoint/predictions_output"
query = predictions_1.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", output_path) \
    .option("checkpointLocation", checkpoint_directory) \
    .start()

In [20]:
result_df = spark.read.parquet(output_path)
result_df.show(truncate=False)

+-------------+-----------+----------+
|rawPrediction|probability|prediction|
+-------------+-----------+----------+
+-------------+-----------+----------+



### 7.Using the prediction results, write code to process the data following the requirements below and show results.
a) Every 10 seconds, show the total number of potential sales transactions (prediction = 1) in the last 1 minute.  


In [None]:
potential_sales = predictions.withColumn("event_time", F.col("event_time").cast("timestamp")) \
    .filter(predictions.prediction == 1) \
    .withWatermark("event_time", "1 minute") \
    .groupBy(F.window(predictions.event_time, "1 minute", "10 seconds")) \
    .agg(F.count("prediction").alias("total_potential_sales")) \
    .select("window.start", "window.end", "total_potential_sales")

query_sales = potential_sales.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='10 seconds') \
    .start()

b) Every 30 seconds, show the total potential revenue in the last 30 seconds. “Potential revenue” here is defined as When prediction=1, extract customer shopping cart detail from metadata (sum of all items of ADD_TO_CART events).

In [None]:
from pyspark.sql.types import DoubleType
import json

def extract_cart_total(metadata):
    try:
        data = json.loads(metadata)
        items = data.get("items", [])
        return sum(item.get("price", 0) for item in items)
    except:
        return 0.0

extract_cart_total_udf = F.udf(extract_cart_total, DoubleType())

potential_revenue = predictions.filter(predictions.prediction == 1) \
    .withColumn("cart_total", extract_cart_total_udf(predictions.event_metadata)) \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(F.window(predictions.event_time, "30 seconds", "30 seconds")) \
    .agg(F.sum("cart_total").alias("total_potential_revenue")) \
    .select("window.start", "window.end", "total_potential_revenue")

query_revenue = potential_revenue.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='30 seconds') \
    .start()

c) Every 1 minute, show the top 10 best-selling products by total quantity. (note: No historical data is required, only the top 10 in each 1-minute window.).

In [None]:
def extract_item_quantities(metadata):
    try:
        data = json.loads(metadata)
        items = data.get("items", [])
        return [(item.get("id", "unknown"), item.get("quantity", 0)) for item in items]
    except:
        return []

extract_item_quantities_udf = F.udf(extract_item_quantities, ArrayType(StructType([
    StructField("id", StringType(), True),
    StructField("quantity", IntegerType(), True)
])))

exploded_items = predictions.filter(predictions.prediction == 1) \
    .withColumn("items", F.explode(extract_item_quantities_udf(predictions.event_metadata))) \
    .select("event_time", "items.id", "items.quantity") \
    .withWatermark("event_time", "1 minute")

top_selling_products = exploded_items.groupBy(F.window(exploded_items.event_time, "1 minute"), "id") \
    .agg(F.sum("quantity").alias("total_quantity")) \
    .withColumn("rank", F.rank().over(Window.partitionBy("window").orderBy(F.desc("total_quantity")))) \
    .filter("rank <= 10") \
    .select("window.start", "window.end", "id", "total_quantity")

query_top_products = top_selling_products.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='1 minute') \
    .start()