Licun Liu  
30901235  
07-02-2021

# 2. Streaming application using Spark Structured Streaming (55%)

### 1. SparkSession is created using a SparkConf object

In [1]:
# Import SparkConf class into program
from pyspark import SparkConf
# Library required
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql import functions as F

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[2]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Assignment_3"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name).set("spark.sql.session.timeZone", "UTC")


import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("Assignment_3") \
    .getOrCreate()


### 2. Define the data schema

In [2]:
# Define the data schema for the sensor location file
schema_2 = StructType([StructField("sensor_id",IntegerType(),True),
                     StructField("sensor_description",StringType(),True),
                      StructField("sensor_name",StringType(),True),
                       StructField("installation_date",DateType(),True),
                        StructField("status",StringType(),True),
                         StructField("note",StringType(),True),
                          StructField("direction_1",StringType(),True),
                           StructField("direction_2",StringType(),True),
                            StructField("latitude",FloatType(),True),
                             StructField("longitude",FloatType(),True),
                              StructField("location",StringType(),True)])                  

In [3]:
# Load the sensor location CSV file into another dataframe
df2 = spark.read.csv('Pedestrian_Counting_System_-_Sensor_Locations.csv',dateFormat= 'yyyy/MM/dd' ,schema=schema_2,header=True)

### 3. Invest the streaming data into Spark Streaming

In [4]:
# Using the topic name from producer
topic = "Assignment"

# Read the stream data
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [5]:
# Get the schema of df
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# Convert to string format
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

### 4. Persist the raw streaming data in parquet format

In [7]:
# Write into parquet files the unsuccessful requests partitioned by status code
raw_streaming_data = df.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/raw_streaming_data")\
        .option("checkpointLocation", "parquet/raw_streaming_data/checkpoint")\
        .start()

### 5. Transformed into the proper formats

#### 5.1 Create the schema

In [8]:
# Create the schema
schema_1 = StructType([StructField("ID",IntegerType(),True),
                     StructField("Date_Time",StringType(),True),
                      StructField("Year",IntegerType(),True),
                       StructField("Month",StringType(),True),
                        StructField("Mdate",IntegerType(),True),
                         StructField("Day",StringType(),True),
                          StructField("Time",IntegerType(),True),
                           StructField("Sensor_ID",IntegerType(),True),
                            StructField("Sensor_Name",StringType(),True),
                             StructField("Hourly_Counts",IntegerType(),True)])

#### 5.2 Transform the schema

In [9]:
# Transform the data
df = df.select(F.from_json(F.col("value").cast("string"), schema=schema_1).alias('parsed_value'))

In [10]:
# Print out the schema
df.printSchema()

root
 |-- parsed_value: struct (nullable = true)
 |    |-- ID: integer (nullable = true)
 |    |-- Date_Time: string (nullable = true)
 |    |-- Year: integer (nullable = true)
 |    |-- Month: string (nullable = true)
 |    |-- Mdate: integer (nullable = true)
 |    |-- Day: string (nullable = true)
 |    |-- Time: integer (nullable = true)
 |    |-- Sensor_ID: integer (nullable = true)
 |    |-- Sensor_Name: string (nullable = true)
 |    |-- Hourly_Counts: integer (nullable = true)



#### 5.3 Renamed the column
The columns need to be renamed appropriately.

In [11]:
# Rename the columns
df_formatted = df.select(
                    F.col("parsed_value.ID").alias("ID"),
                    F.col("parsed_value.Date_Time").alias("Date_Time"),
                    F.col("parsed_value.Year").alias("Year"),
                    F.col("parsed_value.Month").alias("Month"),
                    F.col("parsed_value.Mdate").alias("Mdate"),
                    F.col("parsed_value.Day").alias("Day"),
                    F.col("parsed_value.Time").alias("Time"),
                    F.col("parsed_value.Sensor_ID").alias("Sensor_ID"),
                    F.col("parsed_value.Sensor_Name").alias("Sensor_Name"),
                        F.col("parsed_value.Hourly_Counts").alias("Hourly_Counts")
                )

In [12]:
df_formatted.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: integer (nullable = true)



#### 5.4 Timestamp format

In [13]:
# Deal with Timestamp format
df = df_formatted.withColumn('Date_Time',F.to_timestamp('Date_Time', 'MM/dd/yyyy hh:mm:ss a'))

### 6. Prepare the columns for model prediction

In [14]:
# Libraries required
import time
import datetime


#### 6.a - Next calendar date of “Date_Time”


In [15]:
df = df.withColumn('next_date', df.Date_Time + F.expr('INTERVAL 1 DAYS'))

#### 6.b - Day of the month information

In [16]:
df = df.withColumn('next_Mdate',F.dayofmonth(F.col('next_date')))

#### 6.c - Week of the year information


In [17]:
df = df.withColumn('next_day_week',F.weekofyear(F.col('next_date')))

#### 6.d - Day of the week information


In [18]:
# 6.d - Include day of the week information, assuming Monday being the first day of week
df = df.withColumn('next_day_of_week',F.dayofweek(F.col('next_date')-F.expr('INTERVAL 1 DAYS')))

#### 6.e - Rename the column


In [19]:
# 6.e - Rename the column “Hourly_Count” as “prev_count"
df = df.withColumnRenamed('Hourly_Counts','prev_count')

### 7. Predict the next day’s pedestrian count

#### 7.1 Filter time

In [20]:
# Filter time between 9am-11:59pm
df = df.filter(df["Time"] >= 9 ).filter(df["Time"] <= 23)

#### 7.2 Load the Pipeline

In [21]:
#Loading the Pipeline Model From the filesystem
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load('count_estimation_pipeline_model')

In [22]:
# Print the detail about the model
print(pipelineModel.stages[-1]._java_obj.paramMap())

{
	GBTRegressor_f98f56e12d10-labelCol: Hourly_Counts,
	GBTRegressor_f98f56e12d10-maxBins: 100,
	GBTRegressor_f98f56e12d10-maxDepth: 10
}


#### 7.3 Use the Model

In [23]:
# Using model to predict
predictions = pipelineModel.transform(df)

#### 7.4 Write into parquet

In [24]:
# Write into parquet files the unsuccessful requests partitioned by status code
predict_result = predictions.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/predict_result")\
        .option("checkpointLocation", "parquet/predict_result/checkpoint")\
        .start()

### 8. Process the data

### a. Get the number of hours that the predicted pedestrian count would exceed 2000

In [25]:
# Filter the result exceed 2000
predictions = predictions.filter(predictions["prediction"] > 2000 )

#### 1.Delete the time in Date_Time

In [26]:
# Create a function to delete the time in Date_Time
def func(next_date):
    st = str(next_date)
    result = st[0:10]
    return result

#convert to a UDF Function by passing in the function and return type of function
udf_func = F.udf(func, StringType())
# Call the function
df = predictions.withColumn("Date_Time", udf_func("Date_Time"))

# Deal with Timestamp format
df = df.withColumn('Date_Time',F.to_timestamp('Date_Time', 'yyyy-MM-dd'))

#### 2.Using window 

In [27]:
# Set window size as one day

# Convert datetime to date
windowedCounts = df \
    .groupBy(F.col('sensor_id'),F.window('Date_Time','1 day'))  \
    .count() \
    .select("sensor_id",F.to_date(F.col('window').end).alias('Next_date'),'count')


#### 3.Show the result

In [28]:
# Create function to show values received from input dataframe
def foreach_batch_function(df, epoch_id):
    df.show(20,False)

In [35]:
# Show them inside the notebook file
query = windowedCounts \
    .writeStream \
    .outputMode("update") \
    .foreachBatch(foreach_batch_function)\
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

+---------+---------+-----+
|sensor_id|Next_date|count|
+---------+---------+-----+
+---------+---------+-----+

+---------+----------+-----+
|sensor_id|Next_date |count|
+---------+----------+-----+
|4        |2020-12-02|6    |
|2        |2020-12-02|6    |
|65       |2020-12-02|2    |
|58       |2020-12-02|1    |
|67       |2020-12-02|1    |
|22       |2020-12-02|3    |
|1        |2020-12-02|5    |
|24       |2020-12-02|1    |
|63       |2020-12-02|3    |
|59       |2020-12-02|1    |
|68       |2020-12-02|2    |
|41       |2020-12-02|5    |
+---------+----------+-----+

+---------+----------+-----+
|sensor_id|Next_date |count|
+---------+----------+-----+
|68       |2020-12-03|2    |
|63       |2020-12-03|1    |
|67       |2020-12-03|1    |
|22       |2020-12-03|2    |
|35       |2020-12-03|4    |
|1        |2020-12-03|8    |
|69       |2020-12-03|1    |
|65       |2020-12-03|1    |
|2        |2020-12-03|6    |
|59       |2020-12-03|1    |
|4        |2020-12-03|6    |
|41       |2020-

#### 4.Stop the require

In [37]:
query.stop()

### b. Write the stream back to Kafka sink

#### 1.Join the data and select

In [38]:
# Join these two dataframe
df_join = windowedCounts.join(df2,"sensor_id",how='inner')

In [39]:
# Select some relevant column
df_join = df_join.select("sensor_id","Next_date","count","longitude","latitude")

#### 2.Construct key and value

In [40]:
# Construct key and value columns
df_result = df_join.withColumn('value',F.to_json(F.struct(df_join.columns)))\
            .withColumn('key',F.col('next_date')) \
            .select(F.col('key').cast("String"),F.col('value'))

#### 3.Send back to Kafka

In [41]:
# Write the stream back to Kafka sink using a different topic name

# Topic name is 2.8b_data
kafka = df_result.writeStream.format('kafka') \
              .outputMode('update')\
              .option("kafka.bootstrap.servers","127.0.0.1:9092") \
              .option("topic","2.8b_data")\
              .option("checkpointLocation", "kafka/checkpoint")\
              .trigger(processingTime = '5 seconds')\
              .start()