# Streaming application using Spark Structured Streaming

### 1. Write code to create a SparkSession, which uses four cores with a proper application name, use the Melbourne timezone, and make sure a checkpoint location has been set.


In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("MOTH Streaming Application") \
    .config("spark.master", "local[4]") \
    .config("spark.driver.extraJavaOptions", "-Duser.timezone=Australia/Melbourne") \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .config("spark.sql.streaming.checkpointLocation", "path/to/checkpoint") \
    .getOrCreate()
spark.sparkContext.setCheckpointDir("checkpoint_path")




### 2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. 



In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.functions import when, count, col, month , first , isnan
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import year , current_date
from pyspark.sql import functions as F
import pandas as pd
category_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True),
])
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True), 
    StructField("device_type", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("device_version", StringType(), True),
    StructField("home_location_lat", DoubleType(), True),
    StructField("home_location_long", DoubleType(), True),
    StructField("home_location", StringType(), True),
    StructField("home_country", StringType(), True),
    StructField("first_join_date", TimestampType(), True), 
])
product_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),  
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", IntegerType(), True),
])


transaction_schema = StructType([
    StructField("created_at", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("booking_id", IntegerType(), True),
    StructField("session_id", StringType(), True),  
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", DoubleType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", DoubleType(), True),
    StructField("shipment_date_limit", TimestampType(), True),
    StructField("shipment_location_lat", DoubleType(), True),
    StructField("shipment_location_long", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
])



In [3]:
category_df = spark.read.option("header", "true").option("inferSchema", "true").csv("category.csv")
customer_df = spark.read.option("header", "true").option("inferSchema", "true").csv("customer.csv")
product_df = spark.read.option("header", "true").option("inferSchema", "true").csv("product.csv")
transaction_df = spark.read.option("header", "true").option("inferSchema", "true").csv("new_transactions.csv")


print("Category DataFrame :")
category_df = category_df.drop("#")
category_df.printSchema()

print("Customer DataFrame :")
customer_df = customer_df.drop("#")
customer_df.printSchema()

print("Product DataFrame :")
product_df = product_df.drop("#")
product_df.printSchema()

print("Transaction DataFrame :")
transaction_df= transaction_df.drop("#")
transaction_df.printSchema()



Category DataFrame :
root
 |-- category_id: integer (nullable = true)
 |-- cat_level1: string (nullable = true)
 |-- cat_level2: string (nullable = true)
 |-- cat_level3: string (nullable = true)

Customer DataFrame :
root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_version: string (nullable = true)
 |-- home_location_lat: double (nullable = true)
 |-- home_location_long: double (nullable = true)
 |-- home_location: string (nullable = true)
 |-- home_country: string (nullable = true)
 |-- first_join_date: date (nullable = true)

Product DataFrame :
root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- baseColour: string (nullable 

### 3 Using the Kafka topic from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.





#### Setting up a structured streaming job to consume data from a Kafka topic, apply schema and data transformations, and write the results to the console.
 a) Reading streaming data from a Kafka topic named 'ass2a' 
 b) Specifying "startingOffsets" to "latest," to read from the latest available messages 
 c)This schema describes the expected structure of the data (where "ts" field is expected as an Integer)
 d)Using from_json function to parse the JSON data in the "value" column based on the schema you defined. 
 e)Using explode function to flatten the array of structs created by from_json.
 d)Renaming columns to match the expected column names.
 e)Then configuring the streaming query to write the transformed data to the console."


In [4]:
from pyspark.sql.functions import from_unixtime, col
hostip = "172.20.10.5" 
topic = 'ass2a'


kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest")\
    .option("failOnDataLoss", "false")\
    .load()


In [5]:
# "key" and "value" columns in the Kafka DataFrame are explicitly cast to strings.
kafka_stream_df = kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [6]:
schema = ArrayType(StructType([    
    StructField("id", StringType(),True),
    StructField("session_id", StringType(),True),
    StructField("event_name", StringType(),True),
    StructField("event_id", StringType(),True),
    StructField("traffic_source", StringType(),True),
    StructField("event_metadata", StringType(),True),
    StructField("customer_id", StringType(),True),
    StructField("ts", IntegerType(),True)
]))
kafka_stream_df = kafka_stream_df.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))
kafka_stream_df.printSchema()


root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- session_id: string (nullable = true)
 |    |    |-- event_name: string (nullable = true)
 |    |    |-- event_id: string (nullable = true)
 |    |    |-- traffic_source: string (nullable = true)
 |    |    |-- event_metadata: string (nullable = true)
 |    |    |-- customer_id: string (nullable = true)
 |    |    |-- ts: integer (nullable = true)



In [7]:
kafka_stream_df = kafka_stream_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))      
kafka_stream_df .printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- session_id: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- traffic_source: string (nullable = true)
 |    |-- event_metadata: string (nullable = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- ts: integer (nullable = true)



In [8]:
kafka_stream_df_formatted = kafka_stream_df.select(
                     F.col("unnested_value.id").alias("Id"),
                    F.col("unnested_value.session_id").alias("session_id"),
                    F.col("unnested_value.event_name").alias("event_name"),
                    F.col("unnested_value.event_id").alias("event_id"),
                    F.col("unnested_value.traffic_source").alias("traffic_source"),
                    F.col("unnested_value.event_metadata").alias("event_metadata"),
                    F.col("unnested_value.customer_id").alias("customer_id"),
                    F.col("unnested_value.ts").alias("ts")
    
                )

In [9]:
kafka_stream_df_formatted.printSchema()

root
 |-- Id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- ts: integer (nullable = true)



In [10]:
query = kafka_stream_df_formatted\
   .writeStream \
   .outputMode("update") \
   .format("console") \
   .trigger(processingTime="5 seconds")\
   .start()

#query.awaitTermination()


In [11]:
query.stop()

### 4 Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.  
Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  
b) If the data is late for more than 1 minute, discard it.  





a) Transformations are applied to kafka_stream_df_formatted DataFrame also Columns are renamed and
    cast to their desired data types.
b) The "ts" column is converted to a timestamp format. The result is a new column named "event_time."

c) Filtering Late Data:
        Data  more than 1 minute is discarded by calculating the time difference 
        between the current timestamp and the "event_time,"  and
        filtering out rows where the difference is greater than 60 seconds.


Windowed Aggregation:

The code sets up a windowed aggregation using a 1-minute window on the "event_time" column.
Counting no of sessions within each window 


A processing time trigger is set for the streaming query to execute every 2 minutes.
Schema Output:


In [12]:
hostip = "172.20.10.5" 
topic = 'ass2a'


kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest")\
    .option("failOnDataLoss", "false")\
    .load()

kafka_stream_df = kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


In [13]:
kafka_stream_df.printSchema()


root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [14]:
kafka_stream_df_formatted = kafka_stream_df_formatted \
    .withColumn("id", kafka_stream_df_formatted["Id"].cast(IntegerType())) \
    .withColumn("session_id", kafka_stream_df_formatted["session_id"]) \
    .withColumn("event_name", kafka_stream_df_formatted["event_name"]) \
    .withColumn("event_id", kafka_stream_df_formatted["event_id"]) \
    .withColumn("traffic_source", kafka_stream_df_formatted["traffic_source"]) \
    .withColumn("event_metadata", kafka_stream_df_formatted["event_metadata"]) \
    .withColumn("customer_id", kafka_stream_df_formatted["customer_id"].cast(IntegerType())) \
    .withColumn("ts", kafka_stream_df_formatted["ts"].cast(LongType())) \
    .select("id", "session_id", "event_name", "event_id", "traffic_source", "event_metadata", "customer_id", "ts")

kafka_stream_df_formatted=kafka_stream_df_formatted.drop("id")
kafka_stream_df_formatted.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- ts: long (nullable = true)



In [15]:
#converting 'ts' column to timestamp and rename it to 'event_time'

In [16]:
from pyspark.sql.functions import from_unixtime, col


kafka_stream_df_formatted = kafka_stream_df_formatted.withColumn(
    'event_time',
    from_unixtime(col('ts').cast('double')).cast('timestamp')
)
kafka_stream_df_formatted = kafka_stream_df_formatted.drop('ts')
kafka_stream_df_formatted.printSchema()


root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [17]:
from pyspark.sql.functions import current_timestamp
current_time = current_timestamp()
kafka_stream_df_formatted = kafka_stream_df_formatted.filter((current_time - F.col("event_time")).cast("int") <= 60)

In [18]:
from pyspark.sql.functions import window, sum
from pyspark.sql.window import Window

windowedCounts = kafka_stream_df_formatted \
    .withWatermark("event_time", "1 minute") \
    .groupBy(window(kafka_stream_df_formatted.event_time, "1 minute"))\
    .agg(F.count("session_id").alias("total"))\
    .select("window","total")

In [19]:
query = kafka_stream_df_formatted \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

 
#query.awaitTermination()

In [20]:
query.stop()

In [21]:
kafka_stream_df_formatted .printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- event_time: timestamp (nullable = true)



### 5  
Aggregate the streaming data frame by session id and create features you used in your assignment 2A model. (note: customer ID has already been included in the stream.)   
Then, join the static data frames with the streaming data frame as our final data for prediction.  
Perform data type/column conversion according to your ML model, and print out the Schema.


processesing streaming data, to create features from it, and joins it with static customer information.
The features include counts of different event categories, season,num_cat_highvalue,num_cat_midvaluenum_cat_lowvalue,total_Actions. 

Additionally, a "made_purchase" feature is added based on payment status.

In [22]:
from pyspark.sql.functions import col, count, when, lit, month

# Define the conditions for each season based on the month

season_conditions = {
    "Spring": (month("event_time").between(3, 5)),
    "Summer": (month("event_time").between(6, 8)),
    "Autumn": (month("event_time").between(9, 11)),
    "Winter": (month("event_time").isin([12, 1, 2]))
}



#defining conditions for category
category_conditions = {
    "Category 1": (col("event_name").isin(["ADD_PROMO", "ADD_TO_CART"])),
    "Category 2": (col("event_name").isin(["VIEW_PROMO", "VIEW_ITEM", "SEARCH"])),
    "Category 3": (col("event_name").isin(["SCROLL", "HOMEPAGE", "CLICK"]))
}



#creating features 
feature_df = kafka_stream_df_formatted.select("session_id", "event_name", "event_time","customer_id","event_metadata") \
    .groupBy("session_id","event_name", "event_time","customer_id","event_metadata") \
    .agg(
        count(when(category_conditions["Category 1"], True)).alias("num_cat_highvalue"),
        count(when(category_conditions["Category 2"], True)).alias("num_cat_midvalue"),
        count(when(category_conditions["Category 3"], True)).alias("num_cat_lowvalue"),
        count(when(col("event_name") == "ADD_PROMO", True)).alias("is_promotion"),
        when(season_conditions["Spring"], "Spring")
            .when(season_conditions["Summer"], "Summer")
            .when(season_conditions["Autumn"], "Autumn")
            .when(season_conditions["Winter"], "Winter")
            .otherwise("Unknown").alias("season")
    ) \
    .withColumn("total_actions", col("num_cat_highvalue") + col("num_cat_midvalue") + col("num_cat_lowvalue")) \
    .withColumn("high_value_ratio", (col("num_cat_highvalue") / col("total_actions")) * 100) \
    .withColumn("low_value_ratio", (col("num_cat_lowvalue") / col("total_actions")) * 100)



feature_df.printSchema()


root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- num_cat_highvalue: long (nullable = false)
 |-- num_cat_midvalue: long (nullable = false)
 |-- num_cat_lowvalue: long (nullable = false)
 |-- is_promotion: long (nullable = false)
 |-- season: string (nullable = false)
 |-- total_actions: long (nullable = false)
 |-- high_value_ratio: double (nullable = true)
 |-- low_value_ratio: double (nullable = true)



In [23]:
#joining customer_df and feature_df


new_customer_df = customer_df.select(
    "customer_id",
    "gender",
    "device_type",
    "home_location",
    year("first_join_date").alias("first_join_year"),
    (year(current_date()) - year("birthdate")).alias("age")
)
feature_df = feature_df.join(new_customer_df, "customer_id", "left")
feature_df.na.drop()
feature_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- num_cat_highvalue: long (nullable = false)
 |-- num_cat_midvalue: long (nullable = false)
 |-- num_cat_lowvalue: long (nullable = false)
 |-- is_promotion: long (nullable = false)
 |-- season: string (nullable = false)
 |-- total_actions: long (nullable = false)
 |-- high_value_ratio: double (nullable = true)
 |-- low_value_ratio: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- home_location: string (nullable = true)
 |-- first_join_year: integer (nullable = true)
 |-- age: integer (nullable = true)



In [24]:
#adding made_purchase into features based on payment status



purchased_df = transaction_df.withColumn(
    "made_purchase",
    when(col("payment_status") == "Success", 1).otherwise(0)
).select("session_id", "made_purchase")

feature_df = feature_df.join(purchased_df, "session_id", "left").fillna(0, subset=["made_purchase"])
feature_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- num_cat_highvalue: long (nullable = false)
 |-- num_cat_midvalue: long (nullable = false)
 |-- num_cat_lowvalue: long (nullable = false)
 |-- is_promotion: long (nullable = false)
 |-- season: string (nullable = false)
 |-- total_actions: long (nullable = false)
 |-- high_value_ratio: double (nullable = true)
 |-- low_value_ratio: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- home_location: string (nullable = true)
 |-- first_join_year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- made_purchase: integer (nullable = true)



In [25]:
query = feature_df\
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()
    


#query.stop()

In [26]:
query.stop()

### 6 Load your ML model, and use the model to predict if each session will purchase according to the requirements below:
a) Every 10 seconds, show the total number of potential sales transactions (prediction = 1) in the last 1 minute.   
b) Every 30 seconds, show the total potential revenue in the last 30 seconds. “Potiential revenue” here is definded as: When prediction=1, extract customer shopping cart detail from metadata (sum of all items of ADD_TO_CART events).  
c) Every 1 minute, show the top 10 best-selling products by total quantity. (note: No historical data is required, only the top 10 in each 1 minute window.)

 Loading the model and using it to make predictions on a feature data 

In [27]:


from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("gbt_model")



if isinstance(loaded_model, PipelineModel):
    print("Model loaded successfully.")
else:
    print("Failed to load the model.")


Model loaded successfully.


In [28]:
feature_df = feature_df.dropna()
predictions = loaded_model.transform(feature_df)
predictions.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- num_cat_highvalue: long (nullable = false)
 |-- num_cat_midvalue: long (nullable = false)
 |-- num_cat_lowvalue: long (nullable = false)
 |-- is_promotion: long (nullable = false)
 |-- season: string (nullable = false)
 |-- total_actions: long (nullable = false)
 |-- high_value_ratio: double (nullable = true)
 |-- low_value_ratio: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- home_location: string (nullable = true)
 |-- first_join_year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- made_purchase: integer (nullable = true)
 |-- season_index: double (nullable = false)
 |-- gender_index: double (nullable = false)
 |-- device_type_index: double (nullable = false)
 |-- home_locatio

In [29]:
query = predictions\
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()
    
#query.awaitTermination()

creating a new DataFrame sales_transaction by filtering the predictions DataFrame where prediction==1
using window(column, windowDuration, slideDuration) to define the time windows
calculating the count of records where the "prediction" column has a value of 1.

In [33]:
#6A
sales_transaction = predictions.filter(col("prediction") == 1) \
.groupBy(window(col("event_time"), "60 seconds", "10 seconds")) \
.agg(count(col("prediction")).alias("total_sales_transactions")) \
.select("total_sales_transactions")

In [34]:
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")
query = sales_transaction\
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()
    


In [35]:
sales_transaction.printSchema()

root
 |-- total_sales_transactions: long (nullable = false)



Calculating the total potential revenue in the last 30 seconds for sessions where the prediction is equal to 1.

a)Defining  schema for the event_metadata column, specifying the expected structure of the JSON data within it.

b)When the event_name is "ADD_TO_CART," parsing the data in the event_metadata column.
 
c)Calculates the revenue for each session by multiplying the quantity and item_price


In [36]:
#6B


from pyspark.sql.functions import col, when, from_json, sum
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.window import Window
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("item_price", IntegerType(), True)
])

predictions = predictions.withColumn("cart_items", when(col("event_name") == "ADD_TO_CART", from_json(col("event_metadata"), schema)))

predictions = predictions.withColumn("revenue",
    when(col("event_name") == "ADD_TO_CART",
        col("cart_items.quantity") * col("cart_items.item_price")
    ).otherwise(0)
)

revenue_df = predictions \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(window("event_time", "30 seconds", "30 seconds")) \
    .agg(sum(col("revenue")).alias("potential_revenue"))


In [37]:
revenue_df.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- potential_revenue: long (nullable = true)



In [38]:
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")
query = revenue_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="60 seconds") \
    .start()


a)"product_quantities" is created by selecting product IDs and quantities from "predictions."
b)It calculates the total quantity per product by grouping and summing their quantities, resulting in a "total_quantity" .

In [39]:
#6C


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum
from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("item_price", IntegerType(), True)
])

product_quantities = predictions.select("cart_items.product_id", "cart_items.quantity")
product_quantities = product_quantities.groupBy("product_id").agg(sum("quantity").alias("total_quantity"))



In [40]:
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")
query =product_quantities.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="60 seconds") \
    .start()



### 7  
a) Persist the prediction result along with cart metadata in parquet format; after that, read the parquet file and show the results to verify it is saved properly.  
b) Persist the 30-second sales prediction in another parquet file.

In [41]:
# 7a
prediction1 =predictions.filter(col("event_name") == "ADD_TO_CART") \
    .select("session_id", "customer_id", "event_time", "event_metadata", "made_purchase", "prediction")
prediction1=prediction1.dropna()

In [42]:

query_file_sink = kafka_stream_df_formatted.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/clickstream_df")\
        .option("checkpointLocation", "parquet/clickstream_df/checkpoint")\
        .start()

In [43]:
query_file_sink.stop()

In [44]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define the schema for the data
schema_1 = StructType([
    StructField("session_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_id", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("event_metadata", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("event_time", TimestampType(), True)
])


In [45]:
query_file_sink_df = spark.read.schema(schema_1).parquet("parquet/clickstream_df")
query_file_sink_df.printSchema()
#query_file_sink_df.show()

root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [46]:
query_file_sink_df.show(20)

+--------------------+-----------+--------------------+--------------+--------------------+-----------+-------------------+
|          session_id| event_name|            event_id|traffic_source|      event_metadata|customer_id|         event_time|
+--------------------+-----------+--------------------+--------------+--------------------+-----------+-------------------+
|4046178d-c332-465...|      CLICK|0b56a505-60ec-470...|        MOBILE|                    |      43174|2023-10-19 09:47:54|
|4046178d-c332-465...|   HOMEPAGE|cbd6642e-b475-4e6...|        MOBILE|                    |      43174|2023-10-19 09:47:55|
|40487207-8d37-46f...|ADD_TO_CART|ad9fbf26-8ff5-4a9...|        MOBILE|{'product_id': 91...|       4885|2023-10-19 09:47:56|
|40487207-8d37-46f...|      CLICK|8f5cf8cc-8ece-41e...|        MOBILE|                    |       4885|2023-10-19 09:47:57|
|40487207-8d37-46f...|   PURCHASE|2f50c8f6-f9fc-4e9...|        MOBILE|{'payment_status'...|       4885|2023-10-19 09:47:58|
|4048720

In [47]:
#7B
predictions = predictions.withColumn("revenue",
    when(col("event_name") == "ADD_TO_CART",
        col("cart_items.quantity") * col("cart_items.item_price")
    ).otherwise(0)
)

revenue_df = predictions \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(window("event_time", "30 seconds", "30 seconds")) \
    .agg(sum(col("revenue")).alias("potential_revenue"))



In [48]:
# Define the Parquet path for sales predictions
parquet_path = "parquet/sales_predictions"

# Write the windowed results to Parquet
query = revenue_df.writeStream.format("parquet") \
    .outputMode("append") \
    .option("path", parquet_path) \
    .option("checkpointLocation", "parquet/sales_predictions/checkpoint") \
    .start()

In [49]:
query.stop()

In [50]:
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType

schema_3 = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType(), True),
        StructField("end", TimestampType(), True)
    ]), True),
    StructField("potential_revenue", DoubleType(), True)
])


In [51]:
query_file_sink_df = spark.read.schema(schema_3).parquet("parquet/clickstream_df")
query_file_sink_df.printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- potential_revenue: double (nullable = true)



### 8  
Read the parquet files as a data stream, for 7a) join customer information and send to a Kafka topic with an appropriate name to the data visualisation. For 7b) Send the message directly to another Kafka topic.

In [None]:
# Stream 1


In [None]:
# Stream 2
