# FIT5202 Assessment 2-B

### Producer File for Task-2

#### Student Name: Akash Goyal
#### Student ID: 30749964

**Libraries used:** the main libraries/modules used in my assignment are:
* `os` Setting the configuration for the os
* `SparkConf` So that the Spark configurations can be set
* `SparkContext` To be able to create a spark context object
* `SparkSession` To be able to build a spark session object and set the timezone
* `pyspark.sql.function` package to be able to use spark functions to manipulate data
* `sleep` So that we can use the sleep function to introduce delay
* `pyspark.sql.types` Import different types of DataTypes
* `PipelineModel` To use Pipeline to train model

## Table of Contents

* [Importing Libraries](#lib)
* [Introduction](#intro)
* [1. Creation of SparkSession](#sp)
* [2. Sensor Data Ingestion](#2.sdinges)
* [3. Reading the Pedestrian Data](#rdped)
* [4. Persist the streaming data in parquet](#4)
* [5. Transforming data into format](#5)
* [6. Transformations to prepare the columns for model prediction](#6)
* [7. Modelling Building](#7)
* [8. Aggregating data](#8)
    * [a. Aggregating on days](#8a) 
    * [b. Publishing the data to kafka topic](#8b)

## Importing Libraries <a class="anchor" name="lib"></a>

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
# So that the Spark configurations can be set
from pyspark import SparkConf
# To be able to create a spark context object
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL
# Import functions package
from pyspark.sql import functions as F
# Import datetime for using datetime functions
from datetime import datetime, timedelta
# Import different types of DataTypes 
from pyspark.sql.types import *
from time import sleep

#To use Pipeline to train model
from pyspark.ml.pipeline import PipelineModel

## Introduction <a class="anchor" name="intro"></a>

In this task we have to use Spark to read the stream of data published on the kafka topic **'pedestrianData'** and manipulate the data to get it in shape to be ready for it to get transformed using the given model. The model would then be used to predict the pedestrian counts for the sensors. From there the data would again be written to the kafka topic **'final_joined_df'** for it to be consumed for visualization.

## 1. Creation of SparkSession <a class="anchor" name="sp"></a>

In this section we were required to create the spark context from the Spark Session. So in the below code it can be seen that the first the configurations for the spark sessionobject has been created. The configurations that have been set here are:
* The number of cores/processors that we want to use. This is set in a **master** variable. In the below case **local[2]** has been used which means that two processors that are there in the machine need to be used.
* Next we need to set the appropriate application name.

Using the above configuration values we can create a spark session object. But it is also required to set the UTC time-zone. To set this we need to make changes to the property **spark.sql.session.timeZone**. We need to set 'UTC' to the property as shown in the code below.

After this finally the spark context object can be created and then the log level can be set to **ERROR** so that not a lot of log generation takes place.

In [2]:
# For selecting the number of cores that we want spark to use
# in this case we are asking it to use 2 cores
master = "local[2]"

# Putting the Application name
app_name = "Streaming application using Spark Structured Streaming"

# Setting up the configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Using SparkSession
# Setting the time-zone
spark = SparkSession.builder.config(conf=spark_conf).config('spark.sql.session.timeZone', 'UTC').getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')
spark.catalog.clearCache()

## 2. Sensor Data Ingestion <a class="anchor" name="sdinges"></a>

In this part of the assignment it is required to load the given data in a CSV files into Spark DataFrames with appropriate schema as given in the metadata file.

For this first we need to create the schema according to the metadata file with the exception of **location** column in the sensor data file. The schema can be created using the **StructType()** and the **StructField()** functions.
the complete schema for each dataframe would be created inside the StructType() function and to add a new field and its datatype the StructField datatype is used.

Below is the schema for the sensor data with the appropriate datatypes as mentioned in the metadata file.

In [3]:
# Schema for the sensor data
schema_sensor_data_df = StructType([
    StructField("sensor_id", IntegerType()),
    StructField("sensor_description", StringType()),
    StructField("sensor_name", StringType()),
    StructField("installation_date", DateType()),
    StructField("status", StringType()),
    StructField("note", StringType()),
    StructField("direction_1", StringType()),
    StructField("direction_2", StringType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("location", StringType())
])

Loading the Pedestrian_Counting_System_-_Sensor_Locations.csv to the dataframe: sensor_spdf

In [4]:
# Loading the sensor data
sensor_spdf = spark.read.format('csv').schema(schema_sensor_data_df)\
                         .option('header',True).option('escape','"').option("dateFormat",'yyyy/MM/dd')\
                         .load('Pedestrian_Counting_System_-_Sensor_Locations.csv')
# Print the schema of the sensor dataframe
sensor_spdf.printSchema()

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_description: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- installation_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- note: string (nullable = true)
 |-- direction_1: string (nullable = true)
 |-- direction_2: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- location: string (nullable = true)



## 3. Reading the Pedestrian Data <a class="anchor" name="rdped"></a>

In this part of the task we would be reading the streaming data from the **'pedestrianData'** kafka topic using:
* sparks readstream API
* To read the data from kafka we need to use the the format() function with **'kafka'** parameter.
* option("kafka.bootstrap.servers", "localhost:9092") - To set the server to be used.
* option("subscribe", topic) - Option to set the kafka topic from the where the data needs to be read.

In [5]:
topic = 'pedestrianData'

# Reading the data from the kafka topic
pdstrn_str_df = spark.readStream \
                 .format("kafka") \
                 .option("kafka.bootstrap.servers", "localhost:9092") \
                 .option("subscribe", topic) \
                 .load()

pdstrn_str_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## 4. Persist the raw streaming data in parquet format <a class="anchor" name="4"></a>

In this part of the assignment task its required for us to persist the raw data into **parquet** format as it is a file system that stores tha data in tbular format and can efficiently read back. It helps in the cases of big data. Its much efficient as compared to the row based csv files.
Below are the things used:
* Removal of the checkpoint: Though theoretically there is no need to remove the checkpoint files, but it is causing problems when writing the data to the sink. It was giving errors on the sparkUI. Therefore the code to remove the checkpoint file has been added.
    * rm: To remove the file
    * -rf : -r to remove the file and -f to force it in a quiet manner ie. without display.
* sparks writestream API
* To write the data in parquet format we need to use the the format() function with **'parquet'** parameter.
* outputMode - 'append' : In this mode only the new rows will added to the result table.
* option(path): The file path where the result table would get stored.
* option(checkpointLocation): With checkpointing the state it would help to recover the last state.
* start() : To start writing the data

In [6]:
!rm -rf pdstrn_str_df_parquet/pdstrn_df_chkpnt

# Writing to the parquet file
pdstrn_parquet_sink = pdstrn_str_df.writeStream\
                           .format("parquet")\
                           .outputMode("append")\
                           .option("path", "pdstrn_str_df_parquet")\
                           .option("checkpointLocation", "pdstrn_str_df_parquet/pdstrn_df_chkpnt")\
                           .start()


In [7]:
# Stop the file sink query after some time
# pdstrn_parquet_sink.stop()

## 5. Transforming data into format <a class="anchor" name="5"></a>

In this part of the task we need to create a process which would make sure that the data would get transformed into the structure/schema as the metadata.

First we can convert the value got from the kafka topic into string.

In [8]:
# Converting the data into string
pdstrn_stream_df = pdstrn_str_df.selectExpr("CAST(value AS STRING)")

In [9]:
# Define schema for pedestrian count data
# Taking the datatypes for all the attributes
# as string as the data is given as array of jsons
pedestrian_schema =  ArrayType(StructType([  
    StructField("ID", StringType(), True),
    StructField("Date_Time", StringType(), True),
    StructField("Year", StringType(), True),
    StructField("Month", StringType(), True),
    StructField("Mdate", StringType(), True),
    StructField("Day", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Sensor_ID", StringType(), True),
    StructField("Sensor_Name", StringType(), True),
    StructField("Hourly_Counts", StringType(), True)        
]))

In [10]:
# Converting the json data into the schema created above and naming the column data 
pdstrn_df = pdstrn_stream_df.select(F.from_json(F.col("value"),
                                                pedestrian_schema)\
                                                .alias('data'))

pdstrn_df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- Date_Time: string (nullable = true)
 |    |    |-- Year: string (nullable = true)
 |    |    |-- Month: string (nullable = true)
 |    |    |-- Mdate: string (nullable = true)
 |    |    |-- Day: string (nullable = true)
 |    |    |-- Time: string (nullable = true)
 |    |    |-- Sensor_ID: string (nullable = true)
 |    |    |-- Sensor_Name: string (nullable = true)
 |    |    |-- Hourly_Counts: string (nullable = true)



Next we need to explode the data out of the json array structure so that the values can be parsed to their actual datatype instead of string data. This can be done using the **explode()** function.

In [11]:
# Exploding the data column into elements
pdstrn_df = pdstrn_df.select(F.explode(F.col("data"))\
                     .alias('elements'))

pdstrn_df.printSchema()

root
 |-- elements: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- Date_Time: string (nullable = true)
 |    |-- Year: string (nullable = true)
 |    |-- Month: string (nullable = true)
 |    |-- Mdate: string (nullable = true)
 |    |-- Day: string (nullable = true)
 |    |-- Time: string (nullable = true)
 |    |-- Sensor_ID: string (nullable = true)
 |    |-- Sensor_Name: string (nullable = true)
 |    |-- Hourly_Counts: string (nullable = true)



Next we need to rename the columns using the **alias()** function

In [12]:
pdstrn_df = pdstrn_df.select(pdstrn_df.elements.ID.alias("ID"),
                             pdstrn_df.elements.Date_Time.alias("Date_Time"),
                             pdstrn_df.elements.Year.alias("Year"),
                             pdstrn_df.elements.Month.alias("Month"),
                             pdstrn_df.elements.Mdate.alias("Mdate"),
                             pdstrn_df.elements.Day.alias("Day"),
                             pdstrn_df.elements.Time.alias("Time"),
                             pdstrn_df.elements.Sensor_ID.alias("Sensor_ID"),
                             pdstrn_df.elements.Sensor_Name.alias("Sensor_Name"),
                             pdstrn_df.elements.Hourly_Counts.alias("Hourly_Counts"))

pdstrn_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Date_Time: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Sensor_ID: string (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: string (nullable = true)



In [13]:
# Changing the datatype of the attrbutes into the required datatypes as per the metadata

# ID: Integer Type
pdstrn_df = pdstrn_df.withColumn('ID', pdstrn_df['ID'].cast(IntegerType()))

# Date_Time: Timestamp
pdstrn_df = pdstrn_df.withColumn("Date_Time", F.to_timestamp(pdstrn_df.Date_Time, 'MM/dd/yyyy hh:mm:ss a'))

# Year: Integer Type
pdstrn_df = pdstrn_df.withColumn('Year', pdstrn_df['Year'].cast(IntegerType()))

# Mdate: Integer Type
pdstrn_df = pdstrn_df.withColumn('Mdate', pdstrn_df['Mdate'].cast(IntegerType()))

# Time: Integer Type
pdstrn_df = pdstrn_df.withColumn('Time', pdstrn_df['Time'].cast(IntegerType()))

# Sensor_ID: Integer Type
pdstrn_df = pdstrn_df.withColumn('Sensor_ID', pdstrn_df['Sensor_ID'].cast(IntegerType()))

# Hourly_Counts: Integer Type
pdstrn_df = pdstrn_df.withColumn('Hourly_Counts', pdstrn_df['Hourly_Counts'].cast(IntegerType()))

pdstrn_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: integer (nullable = true)



## 6. Transformations to prepare the columns for model prediction <a class="anchor" name="6"></a>

In [14]:
# pdstrn_df = pdstrn_df.withColumn('next_date', F.date_add(F.to_date(F.col("Date_Time")), 1))

# Adding 1 day to the Date_Time column and also converting it to 'date' datatype
pdstrn_df = pdstrn_df.withColumn('next_date', F.date_add('Date_Time', 1))

# Extracting the month from the next_date attribute
pdstrn_df = pdstrn_df.withColumn('next_Mdate', F.dayofmonth('next_date'))

# Extracting the week from the next_date attribute
pdstrn_df = pdstrn_df.withColumn('next_day_week', F.weekofyear('next_date'))

# Extracting the day of week from the next_date attribute
# and also making sure that Monday is the first day of the week
pdstrn_df = pdstrn_df.withColumn('next_day_of_week', F.when(F.dayofweek('next_date') == 0, 7)\
                                                     .otherwise(F.dayofweek('next_date') - 1))

# Changing the name of the Hourly_Counts column to 'prev_count'
pdstrn_df = pdstrn_df.withColumnRenamed('Hourly_Counts', 'prev_count')

Checking if the schema of the dataframe is what is required.

In [15]:
pdstrn_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- prev_count: integer (nullable = true)
 |-- next_date: date (nullable = true)
 |-- next_Mdate: integer (nullable = true)
 |-- next_day_week: integer (nullable = true)
 |-- next_day_of_week: integer (nullable = true)



Filtering the data on the basis of Time attribute. The hours to be included in the data need to be between 9 and 23 hours. This can be done using the **filter()** function.

In [16]:
# Filtering the data based on the Time attribute
pdstrn_df = pdstrn_df.filter((pdstrn_df.Time >= 9) & \
                             (pdstrn_df.Time <= 23))

## 7. Modelling Building <a class="anchor" name="7"></a>

In this task we have to be able to use the given model and use that to predict pedestrian counts for the data provided.

In [17]:
#specifying the file path for process model
model_path = 'count_estimation_pipeline_model'

#Loading to given model into the pipeline
pipeline_model = PipelineModel.load(model_path) 

#Transforming using the pipeline model
prediction_df = pipeline_model.transform(pdstrn_df) 

Next we need to write the prediction data to the sink in paraquet format.

Below are the things used:
* Removal of the checkpoint: Though theoretically there is no need to remove the checkpoint files, but it is causing problems when writing the data to the sink. It was giving errors on the sparkUI. Therefore the code to remove the checkpoint file has been added.
    * rm: To remove the file
    * -rf : -r to remove the file and -f to force it in a quiet manner ie. without display.
* sparks writestream API
* To write the data in parquet format we need to use the the format() function with **'parquet'** parameter.
* outputMode - 'append' : In this mode only the new rows will added to the result table.
* option(path): The file path where the result table would get stored.
* option(checkpointLocation): With checkpointing the state it would help to recover the last state.
* start() : To start writing the data

In [18]:
!rm -rf parquet_prediction_df/checkpoint

pdstrn_prediction_sink = prediction_df.writeStream.format("parquet")\
                                      .outputMode("append")\
                                      .option("path", "parquet_prediction_df")\
                                      .option("checkpointLocation", "parquet_prediction_df/checkpoint")\
                                      .start()


In [19]:
# Stop the file sink query after some time
# pdstrn_prediction_sink.stop()

## 8. Aggregating data <a class="anchor" name="8"></a>

## Part (a) Aggregating on days <a class="anchor" name="8a"></a>

In this section we would need to:
* Filter the data to get only the rows where the prediction is greater than 2000.
* Convert the next_date attribute to timestamp datatype so that the watermarking can be applied to it.
* Watermarking for 1 day to get complete records of that day.
* Group by or aggregate  based on the window for each day. This can be done using the timestamp column. The aggregation woul dbe done on the basis of the sensor_id column. 

In [20]:
# Filtering the data for predictions more than 2000
analyse_df = prediction_df.filter(prediction_df.prediction > 2000)

# Converting the next_date attribute to timestamp datatype so that the watermarking can be applied to it
analyse_df= analyse_df.withColumn('next_date', F.col('next_date').cast('timestamp'))

In [21]:
# Aggregating the data based on the counts of hours which have
# predictions > 2000
# Watermarking for 1 day has been used
# Window for each day has been considered using the window function
# The aggregation is done usingthe agg() function
df_hours_count = analyse_df.withWatermark('next_date', '1 day')\
                    .groupBy(F.window('next_date','1 day'),'Sensor_ID')\
                    .agg(F.count('Sensor_ID').alias('CountofHours'))\
                    .select("Sensor_ID", "window","CountofHours")

Now that everything is in line we will try to check the schema of the data before we persist it.

In [22]:
df_hours_count.printSchema()

root
 |-- Sensor_ID: integer (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- CountofHours: long (nullable = false)



To see the data in the notebook, one of the ways is to use the **foreachBatch()** functio and the oher way is to use the **format()** as `memory` and in the this case we have used the latter.

**Please Note: One might have to wait for the entire data to be dislayed.**

In [23]:
df_hours_aggregated = df_hours_count \
                        .writeStream \
                        .outputMode("complete") \
                        .format('memory')\
                        .queryName('display_in_noebook')\
                        .start()

Below are the things used:
* sparks writestream API
* outputMode - 'complete' : In this mode the entire updated Result Table will be written to the external storage.
* format(): The format in which the data needs to be, the `memory` format has been chosen in this case.
* queryName(): Using this as the table name the data can be queried.
* start() : To start writing the data

The below query needs to be run multiple times, as it takes time for the data to displayed.

In [41]:
spark.sql('SELECT * from display_in_noebook ORDER BY window, sensor_id').show(500, False)

+---------+------------------------------------------+------------+
|Sensor_ID|window                                    |CountofHours|
+---------+------------------------------------------+------------+
|1        |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|4           |
|2        |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|6           |
|4        |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|4           |
|5        |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|2           |
|6        |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|3           |
|41       |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|1           |
|58       |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|2           |
|59       |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|3           |
|63       |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|4           |
|65       |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|2           |
|67       |[2020-12-02 00:00:00, 2020-12-03 00:00:00]|3           |
|69       |[2020-12-02 00:00:00, 2020-12-03 00:0

In [25]:
# Stopping the query after the required number
# of records get displayed in the output
# df_hours_aggregated.stop()

Below is the former way that is through the **foreachBatch()** function . The below function needs to be called in side the **foreachBatch()** function.

In [26]:
# This is the code for the function that can be used in
# the foreachBatch() function to display the results.
# def display_in_notebook(batch, epoch_id):
    
#     '''
#     This function takes care of the display of batch records.
#     Params:
#     1. batch - The batch data.
#     '''
    
#     batch.show(500, truncate = False)

## Part (b) Publishing the data to kafka topic <a class="anchor" name="8b"></a>

In this section we would need to:
* Join the dataframe created in the last part with the dataframe with sensor locations data which can be done using **join()** function.
* Next we need to Select th columns - window.start, Sensor_ID, latitude, longitude.
* Also make the window.start as key and concatenate others as value.

In [27]:
final_joined_df = df_hours_count.join(sensor_spdf,df_hours_count.Sensor_ID == sensor_spdf.sensor_id, how='inner')\
                                .select(df_hours_count.window.start.cast('string').alias('key')\
                                        ,F.concat(sensor_spdf.latitude.cast('string')\
                                        ,F.lit(','),sensor_spdf.longitude.cast('string')\
                                        ,F.lit(','),sensor_spdf.sensor_id).alias('value'))

Next we need to write the data to a kafka topic which can be done by using the below things:
* Removal of the checkpoint: Though theoretically there is no need to remove the checkpoint files, but it is causing problems when publishing the data to kafka topic and consuming the data for the next Task, i.e. Task 3. Therefore the code to remove the checkpoint file has been added.
    * rm: To remove the file
    * -rf : -r to remove the file and -f to force it in a quiet manner ie. without display.
* sparks writestream API
* To write the data in kafka format we need to use the the format() function with **'kafka'** parameter.
* outputMode - 'append' : In this mode only the new rows will added to the result table.
* option("kafka.bootstrap.servers"): To set the server that needs to be used.
* option(checkpointLocation): With checkpointing the state it would help to recover the last state.
* option(topic): To se the kafka topic that the data needs to be written to.
* start() : To start writing the data

In [28]:
!rm -rf final_joined_df/checkpoint

final_joined_stream = final_joined_df.writeStream \
                                     .format("kafka") \
                                     .outputMode("append")\
                                     .option("kafka.bootstrap.servers", "localhost:9092") \
                                     .option("checkpointLocation", "final_joined_df/checkpoint")\
                                     .option("topic", "final_joined_df") \
                                     .start()


In [29]:
# Stop the stream after the visualization has been done for Task3
# final_joined_stream.stop()