# Streaming application using Spark Structured Streaming

### 1. Write code to create a SparkSession, which uses four cores with a proper application name, use the Melbourne timezone, and make sure a checkpoint location has been set.

In [28]:
import os
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import regexp_extract
import pyspark.sql.functions as F

In [29]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 pyspark-shell'
spark_conf = SparkConf().setMaster("local[4]").setAppName("A2B_rsin0045")
spark = SparkSession.builder.config(conf=spark_conf).config("spark.sql.session.timeZone","Australia/Melbourne").config("spark.sql.streaming.checkpointLocation",'checkpt_folder').config("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false").getOrCreate()
sc = spark.sparkContext.setLogLevel('ERROR')
print("Spark timezone:",spark.conf.get("spark.sql.session.timeZone"))

### 2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. 


In [30]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType,DoubleType

category_schema = StructType([
    StructField("del", StringType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

customer_schema = StructType([
    StructField("del", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", StringType(), True), 
    StructField("device_type", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("device_version", StringType(), True),
    StructField("home_location_lat", FloatType(), True),
    StructField("home_location_long", FloatType(), True),
    StructField("home_location", StringType(), True),
    StructField("home_country", StringType(), True),
    StructField("first_join_date", StringType(), True)
])

product_schema = StructType([
    StructField("del", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", IntegerType(), True)
])

click_stream_schema = StructType([
    StructField("del", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("event_id", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("event_metadata", StringType(), True)
])

transaction_schema = StructType([
    StructField("del", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("booking_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", FloatType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", FloatType(), True),
    StructField("shipment_date_limit", StringType(), True),
    StructField("shipment_location_lat", FloatType(), True),
    StructField("shipment_location_long", FloatType(), True),
    StructField("total_amount", FloatType(), True)
])

In [31]:
category_df = spark.read.csv("category.csv", header=True, schema=category_schema)
customer_df = spark.read.csv("customer.csv", header=True, schema=customer_schema)
product_df = spark.read.csv("product.csv", header=True, schema=product_schema)
click_stream_df = spark.read.csv("click_stream.csv", header=True, schema=click_stream_schema)
transaction_df = spark.read.csv("new_transactions.csv", header=True, schema=transaction_schema)

In [32]:
category_df = category_df.drop("del")
customer_df = customer_df.drop("del")
product_df = product_df.drop("del")
click_stream_df = click_stream_df.drop("del")
transaction_df = transaction_df.drop("del")

### 3 Using the Kafka topic from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.

In [55]:
streamdf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", f'{"192.168.0.140"}:9092') \
  .option("subscribe", "Stream_Data_MOTH") \
  .load()

In [56]:
streamdf.printSchema()

In [57]:
streamdf = streamdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
streamdf.printSchema()

### 4 Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.  
Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  
b) If the data is late for more than 1 minute, discard it. 

In [36]:
from pyspark.sql.types import ArrayType,IntegerType,TimestampType
schema = ArrayType(StructType([    
    StructField('#', StringType(), True), 
    StructField('session_id', StringType(), True),
    StructField('event_name', StringType(), True), 
    StructField('event_id', StringType(), True),
    StructField('traffic_source', StringType(), True), 
    StructField('event_metadata', StringType(), True),
    StructField('customer_id', StringType(), True),
    StructField('ts', TimestampType(), True)
]))
streamdf = streamdf.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))
streamdff = streamdf.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  

In [37]:
streamdf = streamdff.select(
                    F.col("unnested_value.session_id").alias("session_id"),
                    F.col("unnested_value.event_name").alias("event_name"),
                    F.col("unnested_value.event_id").alias("event_id"),
                    F.col("unnested_value.traffic_source").alias("traffic_source"),
                    F.col("unnested_value.event_metadata").alias("event_metadata"),
                    F.col("unnested_value.customer_id").alias("customer_id"),
                    F.col("unnested_value.ts").alias("event_time")
                )

In [38]:
q11 = streamdf \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

In [39]:
q11.stop()

In [40]:
streamdf = streamdf.withWatermark("event_time","1 minute")
streamdf = streamdf.filter(streamdf["event_time"] >= F.expr("event_time - interval 1 minute"))

In [41]:
from pyspark.sql.functions import when, col, month

category_conditions = {
    "add_promo": (col("event_name") == "ADD_PROMO"),
    "num_cat_highvalue": (col("event_name") == "ADD_PROMO") | (col("event_name") == "ADD_TO_CART"),
    "num_cat_midvalue": (col("event_name") == "VIEW_PROMO") | (col("event_name") == "VIEW_ITEM") | (col("event_name") == "SEARCH"),
    "num_cat_lowvalue": (col("event_name") == "SCROLL") | (col("event_name") == "HOMEPAGE") | (col("event_name") == "CLICK")
}

In [42]:
click_stream_df = click_stream_df.withColumn("event_month", month(col("event_time")))

In [43]:
from pyspark.sql import functions as F

for category_name, condition in category_conditions.items():
    click_stream_df = click_stream_df.withColumn(category_name, when(condition, 1).otherwise(0))

# Group by session_id and sum the values for each category column
feature_df = click_stream_df.groupBy("session_id").agg(
    F.sum("add_promo").alias("add_promo"),
    F.sum("num_cat_highvalue").alias("num_cat_highvalue"),
    F.sum("num_cat_midvalue").alias("num_cat_midvalue"),
    F.sum("num_cat_lowvalue").alias("num_cat_lowvalue"),
    F.max("event_month").alias("event_month")
)


In [44]:
# Calculate the total number of actions (total_actions) for each session
feature_df = feature_df.withColumn("total_actions", col("num_cat_highvalue") + col("num_cat_midvalue") + col("num_cat_lowvalue"))

# Calculate the percentage ratio of high-value actions
feature_df = feature_df.withColumn("high_value_ratio", (col("num_cat_highvalue") / col("total_actions")) * 100)

# Calculate the percentage ratio of low-value actions
feature_df = feature_df.withColumn("low_value_ratio", (col("num_cat_lowvalue") / col("total_actions")) * 100)

In [45]:
feature_df = feature_df.withColumn("is_promotion", when(col("add_promo") > 0, 1).otherwise(0))

In [46]:
from pyspark.sql.functions import to_timestamp, month, when,udf
def map_to_season1(month):
    if month in [3, 4, 5]:
        return "Spring"
    elif month in [6, 7, 8]:
        return "Summer"
    elif month in [9, 10, 11]:
        return "Autumn"
    else:
        return "Winter"
map_to_season = udf(lambda z: map_to_season1(z))
feature_df = feature_df.withColumn("season", when(col("event_month").isNotNull(), map_to_season(col("event_month"))).otherwise(None))

In [47]:
from pyspark.sql.functions import year,current_date
joined_df1 = feature_df.join(transaction_df, "session_id", "inner")
joined_df = joined_df1.join(customer_df, "customer_id", "inner")
joined_df = joined_df.withColumn("age", year(current_date()) - year(col("birthdate")))
joined_df = joined_df.withColumn("first_join_year", year(col("first_join_date")))
customer_info_columns = [ "gender", "device_type", "home_location", "first_join_year","age"]
feature_df = joined_df.select(*(feature_df.columns + customer_info_columns))

In [48]:
joined_df = feature_df.join(transaction_df, "session_id", "inner")
joined_df = joined_df.withColumn("purchase_status", when(col("payment_status")  == "Success", 1).otherwise(0))
feature_df = joined_df.select("session_id", "customer_id","num_cat_highvalue", "num_cat_midvalue", "num_cat_lowvalue", "high_value_ratio", "low_value_ratio", "is_promotion","season", "gender", "age", "device_type", "home_location", "first_join_year", "purchase_status")

In [49]:
extracted_features = feature_df.select( "gender", "device_type", "home_location", "first_join_year", "age","customer_id","season",'num_cat_highvalue', 'num_cat_lowvalue', 'num_cat_midvalue', 'high_value_ratio','is_promotion','low_value_ratio')

In [50]:
streamdf_df = streamdf.join(extracted_features,"customer_id",how='left')

In [51]:
q12 = streamdf_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

In [52]:
q12.stop()

In [54]:
streamdf_df.printSchema()

In [24]:
from pyspark.ml import PipelineModel
from pyspark.sql.functions import window
final_model = PipelineModel.load("better2")
pred = final_model.transform(streamdf_df)

In [26]:
sales_no = pred.filter(pred["prediction"] == 1.0)
windowsales = sales_no \
    .withWatermark("event_time", "10 seconds") \
    .groupBy(window("event_time", "1 minute", "10 seconds")).agg(F.sum("prediction").alias("potential_sales"))

In [27]:
q1 = windowsales \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='10 seconds') \
    .option("truncate","false")\
    .start()

In [66]:
q1.stop()

In [32]:
rev = pred.filter(pred["prediction"] == 1.0)
rev = rev.filter(rev["event_name"] == "ADD_TO_CART")
rev = rev.withColumn("item_price",F.get_json_object("event_metadata",'$.item_price'))
rev_window = rev \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(window("event_time", "30 seconds", "30 seconds")).agg(F.sum("item_price").alias("potential_revenue"))

In [33]:
q2 = rev_window \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .option("truncate","false")\
    .start()

In [34]:
q2.stop()

In [37]:
quan = pred.filter(pred["event_name"] == "ADD_TO_CART")
quan = quan.withColumn("id",F.get_json_object("event_metadata",'$.product_id'))
quan = quan.withColumn("quantity",F.get_json_object("event_metadata",'$.quantity'))
quan = quan.join(product_df,"id",how='left')
quan_window = quan \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(window("event_time", "60 seconds")).agg(F.sum("quantity").alias("product_quantity"),\
         F.max("productDisplayName").alias("productDisplayName")).sort(F.desc("product_quantity"))

In [58]:
q3 = quan_window \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='60 seconds') \
    .option("truncate","false")\
    .start()

In [67]:
q3.stop()

In [53]:
df_parquet= pred.select("prediction","event_metadata")

In [93]:
query_file_sink = df_parquet.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/clickstream_df")\
        .option("checkpointLocation", "parquet/clickstream_df/checkpoint")\
        .format("console")\
        .start()

In [84]:
query_file_sink.stop()

In [85]:
schema = StructType([
    StructField("prediction", DoubleType(), True),
    StructField("event_metadata", StringType(), True)
])

In [92]:
query_file_sink_df = spark.read.schema(schema).parquet("parquet/clickstream_df")
query_file_sink_df.printSchema()
query_file_sink_df.show()

root
 |-- prediction: double (nullable = true)
 |-- event_metadata: string (nullable = true)

+----------+--------------+
|prediction|event_metadata|
+----------+--------------+
+----------+--------------+



In [87]:
qq1 = pred \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime='30 seconds') \
    .option("path", "parquet/clickstream_df")\
    .option("checkpointLocation", "parquet/clickstream_df/checkpoint")\
    .start()

In [88]:
qq1.stop()

In [89]:
from pyspark.sql.functions import to_json, struct
kafka_broker = "192.168.0.140:9092"
kafka_topic = "predictions"  
schema = StructType([
    StructField("prediction", StringType(), True),
    StructField("event_metadata", StringType(), True)
])
query_file_sink_df = spark.readStream.schema(schema).parquet("parquet/clickstream_df")
query_file_sink_df = query_file_sink_df.withColumn("value", to_json(struct("prediction", "event_metadata")))
query_file_sink_df = query_file_sink_df.select("value")

In [91]:
kafka_stream = query_file_sink_df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "192.168.0.140:9092") \
    .option("topic", "predictions") \
    .outputMode("append") \
    .start()