# Streaming application using Spark Structured Streaming

### 1. Write code to create a SparkSession, which uses four cores with a proper application name, use the Melbourne timezone, and make sure a checkpoint location has been set.


In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession

check_point = "/Users/sanchita/Documents/SEM2/BigData/FIT5202/Assignment3"
melbourne_timezone = "Australia/Melbourne"
#configuring the spark session
spark = SparkSession.builder \
    .appName("MOTH Predictive Model and Analysis") \
    .config("spark.master", "local[4]") \
    .config("spark.sql.session.timeZone", melbourne_timezone) \
    .config("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")\
    .config("spark.checkpoint.dir", "stream_parquet/click_stream")\
    .config("spark.checkpoint.dir", "sales_parquet/revenue")\
    .getOrCreate()


### 2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. 



In [2]:
# importing packages to define schemas for the datasets based on the metadata
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    DoubleType,
    TimestampType,
    DateType,
)

# defining the schema for category schema
category_schema = StructType(
    [
        StructField("index", IntegerType(), False),
        StructField("category_id", IntegerType(), False),
        StructField("cat_level1", StringType(), True),
        StructField("cat_level2", StringType(), True),
        StructField("cat_level3", StringType(), True),
    ]
)

# defining for customer schema
customer_schema = StructType(
    [
        StructField("index", IntegerType(), False),
        StructField("customer_id", IntegerType(), False),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("username", StringType(), True),
        StructField("email", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("birthdate", DateType(), True),
        StructField("device_type", StringType(), True),
        StructField("device_id", StringType(), True),
        StructField("device_version", StringType(), True),
        StructField("home_location_lat", DoubleType(), True),
        StructField("home_location_long", DoubleType(), True),
        StructField("home_location", StringType(), True),
        StructField("home_country", StringType(), True),
        StructField("first_join_date", DateType(), True),
    ]
)

# defining the product schema
product_schema = StructType(
    [
        StructField("index", IntegerType(), False),
        StructField("product_id", IntegerType(), False),
        StructField("gender", StringType(), True),
        StructField("baseColour", StringType(), True),
        StructField("season", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("usage", StringType(), True),
        StructField("productDisplayName", StringType(), True),
        StructField("category_id", IntegerType(), True),
    ]
)

# defining clisk_stream schema
click_schema = StructType(
    [
        StructField("index", IntegerType(), False),
        StructField("session_id", StringType(), False),
        StructField("event_name", StringType(), True),
        StructField("event_time", TimestampType(), True),
        StructField("event_id", StringType(), True),
        StructField("traffic_source", StringType(), True),
        StructField("event_metadata", StringType(), True),
    ]
)

# defining the transaction schema
transaction_schema = StructType(
    [
        StructField("index", IntegerType(), False),
        StructField("created_at", TimestampType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("booking_id", StringType(), False),
        StructField("session_id", StringType(), True),
        StructField("product_metadata", StringType(), True),
        StructField("payment_method", StringType(), True),
        StructField("payment_status", StringType(), True),
        StructField("promo_amount", DoubleType(), True),
        StructField("promo_code", StringType(), True),
        StructField("shipment_fee", DoubleType(), True),
        StructField("shipment_date_limit", DateType(), True),
        StructField("shipment_location_lat", DoubleType(), True),
        StructField("shipment_location_long", DoubleType(), True),
        StructField("total_amount", DoubleType(), True),
    ]
)

In [3]:
#loading customer data into a dataframe with the customer schema
# defining the parts for each file
category_path = "a2a_dataset/category.csv"
customer_path = "a2a_dataset/customer.csv"
product_path = "a2a_dataset/product.csv"
cust_session_path = "a2a_dataset/customer_session.csv"
clickstream_path = "click_stream.csv"
transaction_path = "new_transactions.csv"

# reading the files based on the above schema and loading them into seperate dataframes
category_df = (
    spark.read.format("csv")
    .option("header", True)
    .schema(category_schema)
    .load(category_path)
)
customer_df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(customer_schema)
    .load(customer_path)
)
product_df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(product_schema)
    .load(product_path)
)
clickstream_df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(click_schema)
    .load(clickstream_path)
)
transaction_df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(transaction_schema)
    .load(transaction_path)
)
cust_session_df = (
    spark.read.format("csv").option("header", "true").load(cust_session_path)
)

print("Category Schema: ")
category_df.printSchema()
print("Customer Schema: ")
customer_df.printSchema()
print("Product Schema: ")
product_df.printSchema()
print("Click Stream Schema: ")
clickstream_df.printSchema()
print("Transaction Schema: ")
transaction_df.printSchema()
print("Customer Session Schema: ")
cust_session_df.printSchema()

Category Schema: 
root
 |-- index: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- cat_level1: string (nullable = true)
 |-- cat_level2: string (nullable = true)
 |-- cat_level3: string (nullable = true)

Customer Schema: 
root
 |-- index: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_version: string (nullable = true)
 |-- home_location_lat: double (nullable = true)
 |-- home_location_long: double (nullable = true)
 |-- home_location: string (nullable = true)
 |-- home_country: string (nullable = true)
 |-- first_join_date: date (nullable = true)

Product Schema: 
root
 |-- index: integer (nullable = true)
 |

### 3 Using the Kafka topic from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.




In [4]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

#reading data from kafka topic
#defining the hostip
hostip = hostip = "192.168.0.250" #change the IP4 address as needed

topic = "customer_clickstream_topic"
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", f'{hostip}:9092') \
  .option("subscribe", topic) \
  .load()

In [5]:
#read data from Kafka
kafka_df = df.selectExpr("CAST(value AS STRING)")
kafka_df.printSchema()

root
 |-- value: string (nullable = true)



In [6]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

#defining schema for the message from Kafka producer
kafka_schema = StructType([
    StructField("click_stream_data", ArrayType(StructType([
        StructField("session_id", StringType(), True),
        StructField("event_name", StringType(), True),
        StructField("event_id", StringType(), True),
        StructField("traffic_source", StringType(), True),
        StructField("event_metadata", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("ts", IntegerType(), True)
    ]))),
    StructField("sent_timestamp", IntegerType())  
])

In [7]:
from pyspark.sql import functions as F
#parsing the string
parsed_kafka_df = kafka_df.select(F.from_json(F.col("value").cast("string"), kafka_schema).alias("data")).select("data.*")
parsed_kafka_df.printSchema()

root
 |-- click_stream_data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- session_id: string (nullable = true)
 |    |    |-- event_name: string (nullable = true)
 |    |    |-- event_id: string (nullable = true)
 |    |    |-- traffic_source: string (nullable = true)
 |    |    |-- event_metadata: string (nullable = true)
 |    |    |-- customer_id: string (nullable = true)
 |    |    |-- ts: integer (nullable = true)
 |-- sent_timestamp: integer (nullable = true)



In [8]:
#exploding the data to get it in a usable formatS
final_df = parsed_kafka_df.selectExpr("sent_timestamp", "explode(click_stream_data) as cust_clickstream")
final_df.printSchema()

root
 |-- sent_timestamp: integer (nullable = true)
 |-- cust_clickstream: struct (nullable = true)
 |    |-- session_id: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- traffic_source: string (nullable = true)
 |    |-- event_metadata: string (nullable = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- ts: integer (nullable = true)



In [None]:
query = final_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()


In [None]:
query.stop()

In [9]:
#second level of explode
final_click_df = final_df.select(
    "sent_timestamp",
    "cust_clickstream.session_id",
    "cust_clickstream.event_name",
    "cust_clickstream.event_id",
    "cust_clickstream.traffic_source",
    "cust_clickstream.event_metadata",
    "cust_clickstream.customer_id",
    "cust_clickstream.ts"
)

final_click_df.printSchema()

root
 |-- sent_timestamp: integer (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- ts: integer (nullable = true)



In [None]:
query = final_click_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [None]:
query.stop()

### 4 Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.  
Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  
b) If the data is late for more than 1 minute, discard it.  



In [10]:
from pyspark.sql.functions import col, from_unixtime, unix_timestamp, expr
from pyspark.sql.window import Window

#converting the ts column to timestamp type and renaming it as event_time
final_click_df = final_click_df.withColumn("event_time", from_unixtime(col("ts")).cast("timestamp"))
final_click_df = final_click_df.drop("ts")
final_click_df.printSchema()

root
 |-- sent_timestamp: integer (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [None]:
query = final_click_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [None]:
query.stop()

In [11]:
from pyspark.sql.functions import current_timestamp

# #defining the one minute interval
one_min_interval = current_timestamp() - expr("INTERVAL 1 MINUTE")
#defining watermark
watermark_interval = "1 minute"
#filtering the dataframe to disgard records more than 1 minute
watermark_df = final_click_df.withWatermark("event_time", watermark_interval)
#disgarding late data
filtered_df = watermark_df.filter(col("event_time") > one_min_interval)

In [None]:
query = filtered_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [None]:
query.stop()

In [None]:
filtered_df.printSchema()

### 5  
Aggregate the streaming data frame by session id and create features you used in your assignment 2A model. (note: customer ID has already been included in the stream.)   
Then, join the static data frames with the streaming data frame as our final data for prediction.  
Perform data type/column conversion according to your ML model, and print out the Schema.


In [12]:
#making transformations on transaction_df
from pyspark.sql.functions import expr, col, when

#total_amount_quartile
# calculating quantiles
quartiles = transaction_df.approxQuantile("total_amount", [0.25, 0.5, 0.75], 0.01)

# defining conditions for the quantiles
quartile_seg = [
    col("total_amount") <= quartiles[0],
    (col("total_amount") > quartiles[0]) & (col("total_amount") <= quartiles[1]),
    (col("total_amount") > quartiles[1]) & (col("total_amount") <= quartiles[2]),
    (col("total_amount") > quartiles[2])
]

# setting labels for the quartiles segregated above
labels = ["Q1", "Q2", "Q3", "Q4"]
#adding the qunatiles to the dataframe
transaction_df = transaction_df.withColumn(
    "total_amount_quartile",
    when(quartile_seg[0], labels[0])
    .when(quartile_seg[1], labels[1])
    .when(quartile_seg[2], labels[2])
    .otherwise(labels[3])
)

#adding shipment fee to the dataframe
transaction_df = transaction_df.withColumn("shipment_fee_paid", when(col("shipment_fee") > 0, 1).otherwise(0))
    
transaction_df.show(5)

+-----+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+-------------------+---------------------+----------------------+------------+---------------------+-----------------+
|index|          created_at|customer_id|          booking_id|          session_id|    product_metadata|payment_method|payment_status|promo_amount|   promo_code|shipment_fee|shipment_date_limit|shipment_location_lat|shipment_location_long|total_amount|total_amount_quartile|shipment_fee_paid|
+-----+--------------------+-----------+--------------------+--------------------+--------------------+--------------+--------------+------------+-------------+------------+-------------------+---------------------+----------------------+------------+---------------------+-----------------+
|    1|2022-06-17 21:20:...|       5898|346f74ca-a933-463...|4d51b69d-544f-44b...|[{'product_id': 1...|   Credit Card|      

In [13]:
#making necessary transformation is customer_df
from pyspark.sql.functions import datediff, current_date, round
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType

# calculating age in customer_df
customer_df = customer_df.withColumn(
    "age",
    (datediff(current_date(), customer_df["birthdate"]) / 365.25).cast(
        "integer"
    ),  # calculating in years
)

# calculating the period that the customer has been on the platform
# defining a function to calculate the period of customer on platform
def days_since_join(first_join_date):
    current_date = F.current_date()
    customer_since = F.datediff(current_date, first_join_date)
    return round(customer_since)


# Adding it to the feature_df
customer_df = customer_df.withColumn(
    "customer_since", days_since_join(F.col("first_join_date")).cast(IntegerType())
)

# transformation of home_location to city-tiering in Indonesia for easy categorisation
# defining location tiers in Indonation
tier1_city = ["Jakarta Raya", "Kalimantan Timur"]  # Highest GDP and metropolitan
tier2_city = [
    "Sulawesi Selatan",
    "Jawa Barat",
    "Riau",
    "Jawa Timur",
    "Jawa Tengah",
]  # Growing GDP and medium economy

# creating a new column
customer_df = customer_df.withColumn(
    "city_tier",
    when(col("home_location").isin(tier1_city), "tier 1")
    .when(col("home_location").isin(tier1_city), "tier 2")
    .otherwise("tier 3"),
)

customer_df.show(5)

+-----+-----------+----------+-----------+--------------------+--------------------+------+----------+-----------+--------------------+--------------------+-----------------+------------------+-------------------+------------+---------------+---+--------------+---------+
|index|customer_id|first_name|  last_name|            username|               email|gender| birthdate|device_type|           device_id|      device_version|home_location_lat|home_location_long|      home_location|home_country|first_join_date|age|customer_since|city_tier|
+-----+-----------+----------+-----------+--------------------+--------------------+------+----------+-----------+--------------------+--------------------+-----------------+------------------+-------------------+------------+---------------+---+--------------+---------+
|    1|       2870|      Lala|    Maryati|671a0865-ac4e-4dc...|671a0865_ac4e_4dc...|     F|1996-06-14|        iOS|c9c0de76-0a6c-4ac...|iPhone; CPU iPhon...|-1.04334537209108|  101.3605

In [14]:
from pyspark.sql.functions import when, col, sum

# adding a feature to identify if a customer has used a promotion or not
filtered_df = filtered_df.withColumn("is_promotion", when(col("event_name") == "ADD PROMO", 1)
                                    .otherwise(0))

#identifying the month of shopping by the customer
filtered_df = filtered_df.withColumn("session_month", F.month(filtered_df["event_time"]))

# assigning season based on month
filtered_df = filtered_df.withColumn(
    "session_season",
    when((col("session_month") >= 3) & (col("session_month") <= 5), "Spring")
    .when((col("session_month") >= 6) & (col("session_month") <= 8), "Summer")
    .when((col("session_month") >= 9) & (col("session_month") <= 11), "Autumn")
    .otherwise("Winter"),
)

#creating features in click_stream_df
# defining categories based on the above
biferations = {
    "Category 1": ["ADD_PROMO", "ADD_TO_CART"],  # highly likely
    "Category 2": ["VIEW_PROMO", "VIEW_ITEM", "SEARCH"],  # likey
    "Category 3": ["SCROLL", "HOMEPAGE", "CLICK"],  # browsing
}

# creating a new column
filtered_df = filtered_df.withColumn(
    "event_category",
    F.when(F.col("event_name").isin(biferations["Category 1"]), "Category 1")
    .when(F.col("event_name").isin(biferations["Category 2"]), "Category 2")
    .when(F.col("event_name").isin(biferations["Category 3"]), "Category 3")
    .otherwise("Unknown"),
)

# creating new columns based on the categories stated to aggregate actions based on session IDs
filtered_df = filtered_df.groupBy("session_id", "customer_id", "is_promotion", "session_season", "event_time", "event_name", "event_metadata").agg(
    sum(when(col("event_category") == "Category 1", 1).otherwise(0)).alias(
        "num_cat_highvalue"
    ),
    sum(when(col("event_category") == "Category 2", 1).otherwise(0)).alias(
        "num_cat_midvalue"
    ),
    sum(when(col("event_category") == "Category 3", 1).otherwise(0)).alias(
        "num_cat_lowvalue"
    ),
)


In [None]:
query = filtered_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [None]:
query.stop()

In [15]:
from pyspark.sql.functions import lit
#getting the features from transaction table to the live stream dataframe
filtered_df = filtered_df.withColumn("shipment_fee_paid", lit(0))

filtered_df = filtered_df.withColumn("payment_method", lit("Credit Card"))

filtered_df = filtered_df.withColumn("total_amount_quartile", lit("Q3"))

In [None]:
query = filtered_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [None]:
query.stop()

In [16]:
#joining the filtered_df with customer_df
features_df = filtered_df.join(customer_df, on = "customer_id", how = "left")
#defining the columns in the features table
features_df = features_df.select(
    "customer_id",
    "num_cat_highvalue",
    "num_cat_midvalue",
    "num_cat_lowvalue",
    "is_promotion",
    "session_season",
    "gender",
    "age",
    "device_type",
    "customer_since",
    "city_tier",
    "shipment_fee_paid",
    "payment_method",
    "total_amount_quartile",
    "event_name",
    "event_metadata",
    "event_time"
)

In [None]:
query = features_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [None]:
query.stop()

### 6 Load your ML model, and use the model to predict if each session will purchase according to the requirements below:
a) Every 10 seconds, show the total number of potential sales transactions (prediction = 1) in the last 1 minute.   
b) Every 30 seconds, show the total potential revenue in the last 30 seconds. “Potiential revenue” here is definded as: When prediction=1, extract customer shopping cart detail from metadata (sum of all items of ADD_TO_CART events).  
c) Every 1 minute, show the top 10 best-selling products by total quantity. (note: No historical data is required, only the top 10 in each 1 minute window.)  


In [17]:
#loading the model
from pyspark.ml import PipelineModel
#loading the model
model = PipelineModel.load("FinalModelA2A/GBT_Model")
#transforming the data
prediction = model.transform(features_df)
#selecting columns from the resulting df
result_df = prediction.select(
    "customer_id",
    "num_cat_highvalue",
    "num_cat_midvalue",
    "num_cat_lowvalue",
    "is_promotion",
    "session_season",
    "gender",
    "age",
    "device_type",
    "customer_since",
    "city_tier",
    "shipment_fee_paid",
    "payment_method",
    "total_amount_quartile",
    "event_time",
    "event_name",
    "event_metadata",
    "prediction"
)

In [None]:
query = result_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [None]:
query.stop()

In [18]:
# 6a
from pyspark.sql.functions import *
#predicting or total number of sales predicted in 1 minute triggered every 10 seconds
window_prediction = result_df \
    .withWatermark("event_time", "1 minute") \
    .groupBy(window("event_time", "10 seconds"), "customer_id") \
    .agg(sum("prediction").alias("total")) \
    .select("window", "customer_id", "total")

In [None]:
query = window_prediction \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='10 seconds') \
    .option("truncate","false")\
    .start()

In [None]:
query.stop()

In [19]:
# 6b
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, MapType, StringType, IntegerType
#filtering for add to cart data only
add_to_cart_df = result_df.filter(result_df.event_name == "ADD_TO_CART")
#defining the schema
metadata_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("item_price", IntegerType(), True),
])
#adding the metadata column
add_to_cart_df = add_to_cart_df.withColumn("metadata", from_json(col("event_metadata"), metadata_schema))
#calculating revenue
add_to_cart_df = add_to_cart_df.withColumn("revenue", col("metadata.quantity") * col("metadata.item_price"))
#getting data as per the requested format
windowed_data = add_to_cart_df.groupBy(window("event_time", "30 seconds", "30 seconds")) \
    .agg(sum("revenue").alias("potential_revenue"))\
    .select("window", "potential_revenue")



In [None]:
windowed_data.printSchema()

In [20]:
query = windowed_data \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime = "30 seconds") \
    .option("truncate", "false") \
    .start()

In [None]:
query.stop()

In [21]:
#selecting data needed for further calculations
add_to_cart_df = add_to_cart_df.select(
    "num_cat_highvalue",
    "num_cat_midvalue",
    "num_cat_lowvalue",
    "is_promotion",
    "session_season",
    "gender",
    "age",
    "device_type",
    "customer_since",
    "city_tier",
    "shipment_fee_paid",
    "payment_method",
    "total_amount_quartile",
    "event_time",
    "event_name",
    "event_metadata",
    "prediction",
    "metadata.product_id",
    "metadata.quantity",
    "metadata.item_price",
    "revenue"
)

add_to_cart_df.printSchema()

root
 |-- num_cat_highvalue: long (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- num_cat_lowvalue: long (nullable = true)
 |-- is_promotion: integer (nullable = false)
 |-- session_season: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- device_type: string (nullable = true)
 |-- customer_since: integer (nullable = true)
 |-- city_tier: string (nullable = true)
 |-- shipment_fee_paid: integer (nullable = false)
 |-- payment_method: string (nullable = false)
 |-- total_amount_quartile: string (nullable = false)
 |-- event_time: timestamp (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- prediction: double (nullable = false)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- item_price: integer (nullable = true)
 |-- revenue: integer (nullable = true)



In [22]:
# 6c
from pyspark.sql.functions import *
#identifying the top products
top_products_df = add_to_cart_df \
    .withWatermark("event_time", "1 minute") \
    .groupBy(window("event_time", "1 minute"), "product_id") \
    .agg(sum("quantity").alias("total_quantity"))
#defining the windoe
top_10 = top_products_df\
    .orderBy("window", col("total_quantity").desc())
#sorting
top_10 = top_10.limit(10)

In [None]:
query = top_10 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [None]:
query.stop()

### 7  
a) Persist the prediction result along with cart metadata in parquet format; after that, read the parquet file and show the results to verify it is saved properly.  
b) Persist the 30-second sales prediction in another parquet file.

In [23]:
# 7a
#securing data for writing the first parquet file
parquet1_df = result_df.select(
    "event_time",
    "prediction",
    "event_metadata",
    "customer_id"
)

In [24]:
#defining a write stream for the first file
windowed_df = parquet1_df.writeStream.format("parquet")\
    .outputMode("append")\
    .option("path", "stream_parquet/parquet")\
    .option("checkpointLocation", "stream_parquet/click_stream")\
    .trigger(processingTime = "5 seconds")\
    .start()

In [None]:
windowed_df.stop()

In [25]:
#writing the first file using the schema as the reading data
parquet1_read_df = spark.read.schema(parquet1_df.schema).parquet("stream_parquet/parquet")
parquet1_read_df.printSchema()
parquet1_read_df.show()

root
 |-- event_time: timestamp (nullable = true)
 |-- prediction: double (nullable = false)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: string (nullable = true)

+-------------------+----------+--------------------+-----------+
|         event_time|prediction|      event_metadata|customer_id|
+-------------------+----------+--------------------+-----------+
|2023-10-18 23:02:12|       0.0|               "NaN"|      75437|
|2023-10-18 23:02:32|       1.0|               "NaN"|      93707|
|2023-10-18 23:02:11|       1.0|               "NaN"|      62179|
|2023-10-18 23:01:44|       1.0|               "NaN"|       6308|
|2023-10-18 23:02:26|       1.0|{'product_id': 44...|       4037|
|2023-10-18 23:01:46|       1.0|               "NaN"|      15252|
|2023-10-18 23:02:20|       1.0|{'product_id': 10...|      70519|
|2023-10-18 23:02:31|       0.0|               "NaN"|      30105|
|2023-10-18 23:02:06|       1.0|{'product_id': 54...|      40436|
|2023-10-18 23:02:16|    

In [26]:
# 7b
#defining data for the second parquet file
parquet2_df = windowed_data.withColumn("start", col("window.start"))\
    .withColumn("end", col("window.end"))\
    .withColumn("sales", col("potential_revenue"))\
    .select("start", "end", "sales")
#wrting the second parquet file
windowed_sales_df = parquet2_df.writeStream.format("parquet")\
    .outputMode("append")\
    .option("path", "sales_parquet/revenue_parquet")\
    .option("checkpointLocation", "sales_parquet/revenue")\
    .trigger(processingTime = "1 second")\
    .start()

In [None]:
windowed_sales_df.stop()

In [27]:
#reading the second parquet file
parquet2_read_df = spark.read.schema(parquet2_df.schema).parquet("sales_parquet/revenue_parquet")
parquet2_read_df.printSchema()
parquet2_read_df.show()

root
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- sales: long (nullable = true)

+-------------------+-------------------+---------+
|              start|                end|    sales|
+-------------------+-------------------+---------+
|2023-10-18 23:00:00|2023-10-18 23:00:30|207151450|
|2023-10-18 23:01:00|2023-10-18 23:01:30| 12587190|
|2023-10-18 22:59:30|2023-10-18 23:00:00|143672580|
|2023-10-18 23:01:30|2023-10-18 23:02:00|212926400|
|2023-10-18 23:02:00|2023-10-18 23:02:30|240820842|
|2023-10-18 23:02:30|2023-10-18 23:03:00| 10846872|
|2023-10-18 23:03:30|2023-10-18 23:04:00|126389884|
|2023-10-18 23:00:30|2023-10-18 23:01:00| 98021985|
|2023-10-18 23:04:00|2023-10-18 23:04:30|206800774|
|2023-10-18 23:04:30|2023-10-18 23:05:00|239272222|
|2023-10-18 23:05:00|2023-10-18 23:05:30|219125678|
+-------------------+-------------------+---------+



### 8  
Read the parquet files as a data stream, for 7a) join customer information and send to a Kafka topic with an appropriate name to the data visualisation. For 7b) Send the message directly to another Kafka topic.

In [28]:
from kafka3 import KafkaProducer
from json import dumps
from time import sleep

#configuation
hostip = "192.168.0.250" #change the IP4 address as needed
#defining the topic
pred_topic = "prediction_final"
sale_topic = "sales_final"
#defining the broker
bootstrap_servers = f'{hostip}:9092'

# defining the Kafka producer instance
# using value serializer to serialise the data efore sending it to a kafka topic
def connect_kafka():
    producer = None
    try:
        producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=lambda r: dumps(r).encode('utf-8'))
        return producer
    except Exception as e:
        print("Exception while connecting to Kafka")
        print(str(e))
        return None

def publish_message(producer_instance, topic_name, data):
    try:
        #sending the read data to the kafka topic
        producer_instance.send(topic_name, data)
        #successfully sent
        print('Message published successfully. Data: ' + str(data))
    except Exception as ex:
        #handling exception
        print('Exception in publishing message.')
        print(str(ex))

In [29]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType

# Stream 1
#defining schema for the parquet file
parquet_1_schema = parquet1_df.schema
# creating a stream for reading data from file
prediction_stream_df = spark.readStream.schema(parquet_1_schema).parquet("stream_parquet/parquet")
#joining with customer_data
prediction_final_df = prediction_stream_df.join(customer_df, "customer_id", "left")
prediction_final_df = prediction_final_df.withColumn("event_time", col("event_time").cast(StringType()))
prediction_final_df = prediction_final_df.withColumn("birthdate", col("birthdate").cast(StringType()))
prediction_final_df = prediction_final_df.withColumn("first_join_date", col("first_join_date").cast(StringType()))

In [None]:
prediction_final_df.printSchema()

In [None]:
query = prediction_final_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [None]:
query.stop()

In [30]:
#defining a function to convert row to dict and send it to kafka
def send_kafka(row):
    data = row.asDict()
    pred_producer = connect_kafka()
    publish_message(pred_producer, pred_topic, data)
    
query = prediction_final_df.writeStream\
    .foreach(send_kafka)\
    .outputMode("append")\
    .start()

In [None]:
query.stop()

In [31]:
# Stream 2
#defining schema for the parquet file
parquet_2_schema = parquet2_df.schema
# creating a stream for reading data from file
sales_stream_df = spark.readStream.schema(parquet_2_schema).parquet("sales_parquet/revenue_parquet")


sales_stream_df = sales_stream_df.withColumn("start", col("start").cast(StringType()))
sales_stream_df = sales_stream_df.withColumn("end", col("end").cast(StringType()))

sales_stream_df.printSchema()

root
 |-- start: string (nullable = true)
 |-- end: string (nullable = true)
 |-- sales: long (nullable = true)



In [32]:
#defining a function to convert row to dict and send it to kafka
def send_kafka(row):
    data = row.asDict()
    sale_producer = connect_kafka()
    publish_message(sale_producer, sale_topic, data)
    
query = sales_stream_df.writeStream\
    .foreach(send_kafka)\
    .outputMode("append")\
    .start()

In [None]:
query.stop()