# 1Predicting sales data using Spark Streaming

### 2.1 Create SparkSession


In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

#os.mkdir("CheckPoint")

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

conf = SparkConf() \
        .setAppName("Sales Prediction") \
        .setMaster("local[4]")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setCheckpointDir("CheckPoint") 
sc.setLogLevel('ERROR')

### 2.2 Define schema and load file



In [2]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType, BooleanType, TimestampType
store_schema = StructType([    
    StructField('Store', IntegerType(), True), 
    StructField('Type', StringType(), True),
    StructField('Size', IntegerType(), True)            
])

df_stores = spark.read.format("csv") \
      .option("header", True) \
      .schema(store_schema) \
      .load("stores.csv")

df_stores.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)



### 2.3 Injest Kafka data




In [3]:
#configuration
hostip = "192.168.1.101" 

topic = "salesPrediction" # setting same topic used in kafka producer

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .load()

### 2.4 Persist raw data


In [4]:
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [6]:
def foreach_batch_function(df, epoch_id):
    df.show(45,False)

In [7]:
# Showing a microbatch
query_raw = df.writeStream.outputMode("Update")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

+---+-----+
|key|value|
+---+-----+
+---+-----+

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key |value                                                                                                                                                                                                                                                                                                     |
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|null|{"Store": "1", "Date": "201

In [8]:
query_raw.stop()

#### Parquet formation

In [9]:
# Setting schema for parquet format keeping all values as string except timestamp as long datatype
schema_produceData = StructType() \
      .add("Store",StringType(),True) \
      .add("Date",StringType(),True) \
      .add("Temperature",StringType(),True) \
      .add("Fuel_Price",StringType(),True) \
      .add("MarkDown1",StringType(),True) \
      .add("MarkDown2",StringType(),True) \
      .add("MarkDown3",StringType(),True) \
      .add("MarkDown4",StringType(),True) \
      .add("MarkDown5",StringType(),True) \
      .add("CPI",StringType(),True) \
      .add("Unemployment",StringType(),True) \
      .add("IsHoliday",StringType(),True) \
      .add("last_weekly_sales", StringType(),True) \
      .add("ts",LongType(),True) 

In [10]:
# using schema in from_json function to get data which will be parsed
df_parquet=df.select(F.from_json(F.col("value").cast("string"), schema_produceData).alias('parsed_value'))

In [11]:
# Unpacking columns from parsed condition
df_parquet = df_parquet.select(
                    F.col("parsed_value.Store").alias("Store"),
                    F.col("parsed_value.Date").alias("Date"),
                    F.col("parsed_value.Temperature").alias("Temperature"),
                    F.col("parsed_value.Fuel_Price").alias("Fuel_Price"),
                    F.col("parsed_value.MarkDown1").alias("MarkDown1"),
                    F.col("parsed_value.MarkDown2").alias("MarkDown2"),
                    F.col("parsed_value.MarkDown3").alias("MarkDown3"),
                    F.col("parsed_value.MarkDown4").alias("MarkDown4"),
                    F.col("parsed_value.MarkDown5").alias("MarkDown5"),
                    F.col("parsed_value.CPI").alias("CPI"),
                    F.col("parsed_value.Unemployment").alias("Unemployment"),
                    F.col("parsed_value.IsHoliday").alias("IsHoliday"),
                    F.col("parsed_value.last_weekly_sales").alias("last_weekly_sales"),
                    F.col("parsed_value.ts").alias("ts")
                )

In [12]:
# Data unpacked
df_parquet.printSchema()

root
 |-- Store: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- Fuel_Price: string (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)
 |-- IsHoliday: string (nullable = true)
 |-- last_weekly_sales: string (nullable = true)
 |-- ts: long (nullable = true)



In [13]:
# Showing a micro-batch
query_parquet = df_parquet.writeStream.outputMode("Update")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+
|Store|Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI|Unemployment|IsHoliday|last_weekly_sales|ts |
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+----------+
|Store|Date      |Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI      |Unemployment|IsHoliday|last_weekly_sales |ts        |
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+------

In [14]:
query_parquet.stop()

##### the query can be viewed using terminal command: docker logs --follow [container_id]

### 2.5 Transform data formats


In [15]:
df_transformed = df_parquet.select(
                            F.col('Store').cast('integer').alias('Store'),
                            F.col('Date').cast('date').alias('Date'),
                            F.col('Temperature').cast('float').alias('Temperature'),
                            F.col('Fuel_Price').cast('float').alias('Fuel_Price'),
                            F.col('MarkDown1').cast('float').alias('MarkDown1'),
                            F.col('MarkDown2').cast('float').alias('MarkDown2'),
                            F.col('MarkDown3').cast('float').alias('MarkDown3'),
                            F.col('MarkDown4').cast('float').alias('MarkDown4'),
                            F.col('MarkDown5').cast('float').alias('MarkDown5'),
                            F.col('CPI').cast('float').alias('CPI'),
                            F.col('Unemployment').cast('float').alias('Unemployment'),
                            F.col('IsHoliday').cast('boolean').alias('IsHoliday'),
                            F.col('last_weekly_sales').cast('float').alias('last_weekly_sales'),
                            F.col('ts').cast('timestamp').alias('ts')
                        )

In [16]:
df_transformed.printSchema() # appropriate column transformation

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: float (nullable = true)
 |-- Fuel_Price: float (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: float (nullable = true)
 |-- Unemployment: float (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- last_weekly_sales: float (nullable = true)
 |-- ts: timestamp (nullable = true)



In [19]:
# Showing a micro-batch of transformed datframe
query_transformed = df_transformed.writeStream.outputMode("Update")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+
|Store|Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI|Unemployment|IsHoliday|last_weekly_sales|ts |
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+-----------------+-------------------+
|Store|Date      |Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI      |Unemployment|IsHoliday|last_weekly_sales|ts                 |
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+---------

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+-----------------+-------------------+
|Store|Date      |Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI      |Unemployment|IsHoliday|last_weekly_sales|ts                 |
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+-----------------+-------------------+
|1    |2011-02-04|42.27      |2.989     |null     |null     |null     |null     |null     |212.56688|7.742       |false    |1316899.2        |2023-02-10 07:37:25|
|2    |2011-02-04|38.25      |2.989     |null     |null     |null     |null     |null     |212.22406|8.028       |false    |1695371.6        |2023-02-10 07:37:25|
|3    |2011-02-04|45.95      |2.989     |null     |null     |null     |null     |null     |215.88634|7.551       |false    |364866.25        |2023-02-10 07:37:25|
|4    |2011-02-04|34.6

In [20]:
# stopping query
query_transformed.stop()

### 2.6 Prepare feature columns


In [21]:
# creating more columns using withColumn function
df_features = df_transformed.withColumn('Month', F.month('Date')) \
                          .withColumn('day_of_month', F.dayofmonth('Date')) \
                          .withColumn('day_of_year', F.dayofyear('Date')) \
                          .withColumn('week_of_year', F.weekofyear('Date'))                         

In [22]:
df_features.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: float (nullable = true)
 |-- Fuel_Price: float (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: float (nullable = true)
 |-- Unemployment: float (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- last_weekly_sales: float (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- Month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_year: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)



### 2.7 Join the local data


####            In spark, the left outer join is supported when the stream DF acts as the left table during the join.
####             So, stream-dataframe (df_features) is placed in left and static dataframe (df_stores) is placed in right for left joining.
##### Reference : ProjectPro  , https://www.projectpro.io/recipes/perform-stream-batch-or-static-joins-spark-structured-streaming#mcetoc_1g4rg2kjqa

####


In [23]:
joined_df = df_features.join(df_stores, df_features.Store == df_stores.Store, how = "left").drop(df_stores.Store)

In [24]:
joined_df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: float (nullable = true)
 |-- Fuel_Price: float (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: float (nullable = true)
 |-- Unemployment: float (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- last_weekly_sales: float (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- Month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_year: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)



### 2.8 Perform predictions


In [25]:
#importing library
from pyspark.ml import PipelineModel

In [26]:
# loading the model
gbtModel = PipelineModel.load('sales_estimation_pipeline_model')

In [27]:
# creating prediction on joined dataframe
test_case = joined_df
predictionsGbt = gbtModel.transform(test_case)

In [28]:
# showing a microbatch on prediction 
query_prediction = predictionsGbt.writeStream.outputMode("Update")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+-----+------------+-----------+------------+----+----+--------+--------+--------+----------+
|Store|Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI|Unemployment|IsHoliday|last_weekly_sales|ts |Month|day_of_month|day_of_year|week_of_year|Type|Size|Type_idx|Type_vec|features|prediction|
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+-----+------------+-----------+------------+----+----+--------+--------+--------+----------+
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+-----------------+---+-----+------------+-----------+------------+----+----+--------+--------+--------+----------+

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+

In [29]:
query_prediction.stop()

### 2.9 write code to process the data following requirements


In [30]:
# Creating 'Achieved_Goal' column based on 8.5 threshold value
predictionsGbt = predictionsGbt. \
                        withColumn("Achieved_Goal", 
                                   F.when((F.col('prediction')/F.col('Size'))>8.5, 1).otherwise(0))

#### Medium, Apache Spark Structured Streaming — Operations (5 of 6), viewed from : https://medium.com/expedia-group-tech/apache-spark-structured-streaming-operations-5-of-6-40d907866fa7

#### ProjectPro , https://www.projectpro.io/recipes/perform-window-operations-during-spark-structured-streaming

#### Spark, Structured Programming Guide, https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

In [31]:
#Step1: First line: withWatermark, setting old data dropout time
#Step2 :Second and 3rd line: with window function, setting window size (10 seconds) on timestamp with sliding time (5 seconds)
# step3 : 4th line : aggregating by counting achoeved goal
# step 4 :5th line: dropping any null values

df_sales = predictionsGbt.withWatermark("ts", "3 seconds") \
                            .groupBy(F.window(F.col('ts'), "10 seconds", 
                                    "5 seconds"),F.col("Type")) \
                            .agg(F.count(F.col('Achieved_Goal')==1).alias("Count")) \
                            .na.drop("any")

In [32]:
# Showing a micro-batch of the count of stores that meet the threshold with window time
sales_prediction = df_sales.writeStream.outputMode("Update")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

+------+----+-----+
|window|Type|Count|
+------+----+-----+
+------+----+-----+

+------------------------------------------+----+-----+
|window                                    |Type|Count|
+------------------------------------------+----+-----+
|{2023-02-10 07:38:40, 2023-02-10 07:38:50}|B   |17   |
|{2023-02-10 07:38:40, 2023-02-10 07:38:50}|C   |6    |
|{2023-02-10 07:38:45, 2023-02-10 07:38:55}|B   |17   |
|{2023-02-10 07:38:40, 2023-02-10 07:38:50}|A   |22   |
|{2023-02-10 07:38:45, 2023-02-10 07:38:55}|C   |6    |
|{2023-02-10 07:38:45, 2023-02-10 07:38:55}|A   |22   |
+------------------------------------------+----+-----+

+------+----+-----+
|window|Type|Count|
+------+----+-----+
+------+----+-----+

+------------------------------------------+----+-----+
|window                                    |Type|Count|
+------------------------------------------+----+-----+
|{2023-02-10 07:38:50, 2023-02-10 07:39:00}|B   |17   |
|{2023-02-10 07:38:45, 2023-02-10 07:38:55}|B   |34  

In [33]:
sales_prediction.stop()

### 2.10 average weekly sales predictions of different types of stores and write the stream back to Kafka sink using a different topic name

The data you sended should be like this:

|  key   | value  |
|  ----  | ----  |
| timestamp of window start | JSON of store type and avg sales |
| '1673233646'  | '{"Type":"A","predict_weekly_sales":20000}' |

In [34]:
# Creating average weekly sales column named as 'predict_weekly_sales'
df_sales = predictionsGbt.withWatermark("ts", "3 seconds") \
                         .groupBy(F.window(F.col('ts'), "10 seconds", 
                                    "5 seconds"),F.col("Type")) \
                         .agg(F.round(F.mean(F.col('prediction')),2).alias("predict_weekly_sales"))

In [35]:
# taking of window start time and convert to unix timestamp, named as key
df_sales = df_sales.withColumn('key',F.unix_timestamp(F.col("window.start")))

In [45]:
# converting Type and predict_weekly_sales as json format and named as value
df_sales_final = df_sales.withColumn('value', 
                                    F.to_json(F.struct("key","Type", "predict_weekly_sales"))) \
                                    .select('key','value')

In [46]:
# selecting only key and value column after converting to string
df_sales_final = df_sales_final.select(
                            F.col('key').cast('string').alias('key'),
                            F.col('value').cast('string').alias('value'))

In [47]:
df_sales_final.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [54]:
# Showing the json format value and key in memory sink
prediction_query = df_sales_final \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("key_value") \
    .trigger(processingTime='5 seconds') \
    .start()

In [55]:
spark.sql("select * from key_value").show(truncate=False)

+----------+---------------------------------------------------------------+
|key       |value                                                          |
+----------+---------------------------------------------------------------+
|1676014975|{"key":1676014975,"Type":"A","predict_weekly_sales":1338065.17}|
|1676014980|{"key":1676014980,"Type":"B","predict_weekly_sales":762763.68} |
|1676014975|{"key":1676014975,"Type":"B","predict_weekly_sales":762763.68} |
|1676014980|{"key":1676014980,"Type":"A","predict_weekly_sales":1338065.17}|
|1676014980|{"key":1676014980,"Type":"C","predict_weekly_sales":454720.22} |
|1676014975|{"key":1676014975,"Type":"C","predict_weekly_sales":454720.22} |
+----------+---------------------------------------------------------------+



In [56]:
prediction_query.stop()

In [58]:
#Step: 
#    - confirming key and value are in string
#    - writin stream data in kafka sink using writeStream and format method correspondingly
#    - Mentioning a topic name, here, topic1
#    - Setting the checkpoint
#    - Setting output mode as complete

hostip = "192.168.1.101"
ds = df_sales_final \
    .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", "topic1") \
    .option("checkpointLocation", "checkpoint") \
    .option("truncate", False) \
    .outputMode("Complete").start()

In [53]:
ds.stop()


##### For cleaning up the quries and files

In [53]:
# queries are cleaned right after making the queries.