# Assignment 2b
## Student name: NAN XIAO, Student ID: 30280443

# Task 2 Streaming application using Spark Structured Streaming

<a class="anchor" id="lab-task-1"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:4px"><strong style="color:#FF5555">DESCRIPTION</strong> 
<br>In this task, I will implement Spark Structured Streaming to consume the data from Kafka server and do the prediction.
</div>

## 2.1  Create SparkSession

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, udf
from pyspark.sql.functions import split
from pyspark.sql.functions import col, decode, expr
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json
import datetime as dt

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
master = "local[*]"
# Config app name which will be shown on the Spark cluster UI page
app_name = "Melbourne pedestrian traffic"

# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession, Row # Spark SQL

# Configure timezone
# CODE REFERENCE -- https://stackoverflow.com/questions/49644232/apache-spark-how-to-set-timezone-to-utc-currently-defaulted-to-zulu
spark = SparkSession.builder.config(conf=spark_conf).config('spark.sql.session.timeZone', 'UTC').getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('ERROR')

## 2.2 Load the sensor data

In [2]:
# Define the schema
schema_s = StructType([
   StructField("sensor_id", IntegerType(), True),
   StructField("sensor_description", StringType(), True),
   StructField("sensor_name", StringType(), True),
   StructField("installation_date",  DateType(), True),
   StructField("status", StringType(), True),
   StructField("note", StringType(), True),
   StructField("direction_1", StringType(), True),
   StructField("direction_2", StringType(), True),
   StructField("latitude", FloatType(), True),
   StructField("longitude", FloatType(), True),
   StructField("location", StringType(), True)])

In [3]:
# Load data
df_sensor = spark.read.csv("Pedestrian_Counting_System_-_Sensor_Locations.csv",\
                           header=True, sep=",", schema=schema_s, dateFormat = "yyyy/MM/dd")

In [4]:
df_sensor.printSchema()

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_description: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- installation_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- note: string (nullable = true)
 |-- direction_1: string (nullable = true)
 |-- direction_2: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- location: string (nullable = true)



## 2.3 Ingest the streaming data

In [5]:
# Read Stream from the Kafka Topic

# Define the topic that will retrive from kafka server
topic = "sensor_data"
counts_df_read = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", topic) \
  .load()

## 2.4 Persist the raw streaming data in parquet format

In [6]:
# Write the streaming data to parquet file
write_parquet = counts_df_read.writeStream\
    .format("parquet")\
    .outputMode("append")\
    .option("path", "counts.parquet")\
    .option("checkpointLocation","parquet/counts_df/checkpoint")\
    .start()
# Need to run this command for a few seconds then stop, otherwise the parquet will be empty

In [7]:
write_parquet.stop()

In [8]:
# Test if the writing is successful
read_parquet_df = spark.read.parquet("counts.parquet")
read_parquet_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## 2.5 Transform data formats

<a class="anchor" id="lab-task-1"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:4px"><strong style="color:#FF5555">DESCRIPTION</strong> 
<br>In this task, First is to get the column we need from streaming data, which are value and timestamp.
<br>Value column consists the data and the timestamp is the event time that kafka received the data from producer. The timestamp will be used define the time-based window.
<br>After get the column I need, I defined the schema to match the data type of the streaming, so that I can use the from_json function to parse the value.
<br>Then is to transfer the data type according to the meta data file.
</div>

In [9]:
# Get the value column and timestamp column and transfer them into string format
counts_df_read = counts_df_read.selectExpr("CAST(value AS STRING)","CAST(timestamp AS STRING)")

In [10]:
counts_df_read.printSchema()

root
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [11]:
#Define the schema for the structured datastream received
schema = StructType([
   StructField("ID", StringType(), True),
   StructField("Date_Time", StringType(), True),
   StructField('Year', StringType(), True),
   StructField("Month", StringType(), True),
   StructField("Mdate", StringType(), True),
   StructField("Day", StringType(), True),
   StructField("Time", StringType(), True),
   StructField("Sensor_ID", StringType(), True),
   StructField("Sensor_Name", StringType(), True),
   StructField("Hourly_Counts", StringType(), True)])

listSchema = ArrayType(schema)

In [12]:
counts_df = counts_df_read\
       .select(F.from_json(F.col("value").cast("string"),listSchema).alias('parsed_value')\
               ,F.col("timestamp").cast("string"))\
       .select(explode(F.col("parsed_value")).alias("unnested_values"),"timestamp")\
       .select(F.col("unnested_values.*"), "timestamp")\
       .withColumn("ID",F.col("ID").cast(IntegerType()))\
       .withColumn("Year",F.col("Year").cast(IntegerType()))\
       .withColumn("Mdate", F.col("Mdate").cast(IntegerType()))\
       .withColumn("Time", F.col("Time").cast(IntegerType()))\
       .withColumn("Sensor_ID", F.col("Sensor_ID").cast(IntegerType()))\
       .withColumn("Hourly_Counts", F.col("Hourly_Counts").cast(IntegerType()))\
       .withColumn("Date_Time",F.to_timestamp("Date_Time",'MM/dd/yyyy hh:mm:ss a'))\
       .withColumn("timestamp",F.to_timestamp("timestamp",'yyyy-MM-dd hh:mm:ss.SSS'))

In [13]:
# Print schema to test if the data is transfered successful
counts_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [14]:
# Query to test if the data is transfered successful
query = counts_df\
    .writeStream\
    .outputMode("append")\
    .format("memory")\
    .queryName("counts_df")\
    .trigger(processingTime="2 seconds")\
    .start()

In [17]:
counts_df_final = spark.sql("select * from counts_df").toPandas()

In [18]:
counts_df_final.head(10)

Unnamed: 0,ID,Date_Time,Year,Month,Mdate,Day,Time,Sensor_ID,Sensor_Name,Hourly_Counts,timestamp
0,3444255,2020-12-07 01:00:00,2020,December,7,Monday,1,35,Southbank,47,2021-02-10 10:45:32.889
1,3444537,2020-12-07 05:00:00,2020,December,7,Monday,5,71,Westwood Place,11,2021-02-10 10:45:32.889
2,3444155,2020-12-07 00:00:00,2020,December,7,Monday,0,4,Town Hall (West),98,2021-02-10 10:45:32.889
3,3444156,2020-12-07 00:00:00,2020,December,7,Monday,0,17,Collins Place (South),14,2021-02-10 10:45:32.889
4,3444157,2020-12-07 00:00:00,2020,December,7,Monday,0,18,Collins Place (North),8,2021-02-10 10:45:32.889
5,3444158,2020-12-07 00:00:00,2020,December,7,Monday,0,53,Collins St (North),16,2021-02-10 10:45:32.889
6,3444159,2020-12-07 00:00:00,2020,December,7,Monday,0,2,Bourke Street Mall (South),25,2021-02-10 10:45:32.889
7,3444160,2020-12-07 00:00:00,2020,December,7,Monday,0,1,Bourke Street Mall (North),65,2021-02-10 10:45:32.889
8,3444162,2020-12-07 00:00:00,2020,December,7,Monday,0,9,Southern Cross Station,10,2021-02-10 10:45:32.889
9,3444163,2020-12-07 00:00:00,2020,December,7,Monday,0,10,Victoria Point,3,2021-02-10 10:45:32.889


In [19]:
query.stop()

## 2.6 Prepare the columns for model prediction

In [20]:
# By default day of week function, sunday is the first day of the week
# By using this udf, I will change to sunday is last day and monday is the first day of the week
getDayOfWeek = udf(lambda x: getWeek(x), IntegerType())
spark.udf.register("getDayOfWeek", getDayOfWeek)

def getWeek(week):
    if week == 1:
        week = 7
    else:
        week = week - 1
    return week

# Prepare the columns
counts_df_next = counts_df.withColumn("next_date", F.date_add(F.col("date_time").cast(DateType()),1))\
                          .withColumn("next_Mdate",F.dayofmonth(F.col("next_date")))\
                          .withColumn("next_day_week", F.weekofyear(F.col("next_date")))\
                          .withColumn("next_day_of_week", getDayOfWeek(F.dayofweek(F.col("next_date")).cast(IntegerType())))\
                          .withColumnRenamed("Hourly_Counts","prev_count")



In [21]:
# Print schema to test if the column is adding successful
counts_df_next.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- prev_count: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- next_date: date (nullable = true)
 |-- next_Mdate: integer (nullable = true)
 |-- next_day_week: integer (nullable = true)
 |-- next_day_of_week: integer (nullable = true)



In [22]:
# Query to test if the column is adding successful
query = counts_df_next\
    .writeStream\
    .outputMode("append")\
    .format("memory")\
    .queryName("counts_df")\
    .trigger(processingTime="2 seconds")\
    .start()

In [25]:
counts_df_next_query = spark.sql("select * from counts_df").toPandas()

In [26]:
counts_df_next_query.head(10)

Unnamed: 0,ID,Date_Time,Year,Month,Mdate,Day,Time,Sensor_ID,Sensor_Name,prev_count,timestamp,next_date,next_Mdate,next_day_week,next_day_of_week
0,3448759,2020-12-09 23:00:00,2020,December,9,Wednesday,23,35,Southbank,294,2021-02-10 10:45:42.967,2020-12-10,10,50,4
1,3447227,2020-12-09 00:00:00,2020,December,9,Wednesday,0,4,Town Hall (West),84,2021-02-10 10:45:42.967,2020-12-10,10,50,4
2,3447228,2020-12-09 00:00:00,2020,December,9,Wednesday,0,17,Collins Place (South),17,2021-02-10 10:45:42.967,2020-12-10,10,50,4
3,3447229,2020-12-09 00:00:00,2020,December,9,Wednesday,0,18,Collins Place (North),11,2021-02-10 10:45:42.967,2020-12-10,10,50,4
4,3447230,2020-12-09 00:00:00,2020,December,9,Wednesday,0,53,Collins St (North),21,2021-02-10 10:45:42.967,2020-12-10,10,50,4
5,3447231,2020-12-09 00:00:00,2020,December,9,Wednesday,0,2,Bourke Street Mall (South),15,2021-02-10 10:45:42.967,2020-12-10,10,50,4
6,3447232,2020-12-09 00:00:00,2020,December,9,Wednesday,0,1,Bourke Street Mall (North),49,2021-02-10 10:45:42.967,2020-12-10,10,50,4
7,3447234,2020-12-09 00:00:00,2020,December,9,Wednesday,0,9,Southern Cross Station,10,2021-02-10 10:45:42.967,2020-12-10,10,50,4
8,3447235,2020-12-09 00:00:00,2020,December,9,Wednesday,0,10,Victoria Point,15,2021-02-10 10:45:42.967,2020-12-10,10,50,4
9,3447236,2020-12-09 00:00:00,2020,December,9,Wednesday,0,12,New Quay,12,2021-02-10 10:45:42.967,2020-12-10,10,50,4


In [27]:
query.stop()

## 2.7 Predict the next day’s pedestrian count

In [28]:
#Loading the Pipeline Model From the filesystem
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load('count_estimation_pipeline_model')
print(pipelineModel.stages[-1]._java_obj.paramMap())

{
	GBTRegressor_f98f56e12d10-labelCol: Hourly_Counts,
	GBTRegressor_f98f56e12d10-maxBins: 100,
	GBTRegressor_f98f56e12d10-maxDepth: 10
}


In [29]:
# Use the provided model to predict between 9am-11:59pm
test_data = counts_df_next.filter((F.col('Time') >= 9) & (F.col('Time') <= 23))
prediction = pipelineModel.transform(test_data)

In [30]:
# Print schema to test if the prediction is successful
query = prediction\
    .writeStream\
    .outputMode("append")\
    .format("memory")\
    .queryName("prediction")\
    .trigger(processingTime="2 seconds")\
    .start()

In [34]:
prediction_show = spark.sql("select * from prediction").toPandas()

In [35]:
prediction_show.head(10)

Unnamed: 0,ID,Date_Time,Year,Month,Mdate,Day,Time,Sensor_ID,Sensor_Name,prev_count,timestamp,next_date,next_Mdate,next_day_week,next_day_of_week,features,prediction
0,3452979,2020-12-12 17:00:00,2020,December,12,Saturday,17,12,New Quay,382,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[17.0, 12.0, 382.0, 13.0, 50.0, 7.0]",505.330188
1,3452459,2020-12-12 09:00:00,2020,December,12,Saturday,9,4,Town Hall (West),761,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 4.0, 761.0, 13.0, 50.0, 7.0]",634.486625
2,3452460,2020-12-12 09:00:00,2020,December,12,Saturday,9,17,Collins Place (South),118,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 17.0, 118.0, 13.0, 50.0, 7.0]",127.054757
3,3452461,2020-12-12 09:00:00,2020,December,12,Saturday,9,18,Collins Place (North),53,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 18.0, 53.0, 13.0, 50.0, 7.0]",126.313274
4,3452462,2020-12-12 09:00:00,2020,December,12,Saturday,9,53,Collins St (North),364,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 53.0, 364.0, 13.0, 50.0, 7.0]",377.545406
5,3452463,2020-12-12 09:00:00,2020,December,12,Saturday,9,2,Bourke Street Mall (South),524,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 2.0, 524.0, 13.0, 50.0, 7.0]",444.175719
6,3452464,2020-12-12 09:00:00,2020,December,12,Saturday,9,1,Bourke Street Mall (North),673,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 1.0, 673.0, 13.0, 50.0, 7.0]",548.925167
7,3452465,2020-12-12 09:00:00,2020,December,12,Saturday,9,9,Southern Cross Station,48,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 9.0, 48.0, 13.0, 50.0, 7.0]",70.964373
8,3452466,2020-12-12 09:00:00,2020,December,12,Saturday,9,10,Victoria Point,60,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 10.0, 60.0, 13.0, 50.0, 7.0]",67.719894
9,3452467,2020-12-12 09:00:00,2020,December,12,Saturday,9,12,New Quay,195,2021-02-10 10:45:58.089,2020-12-13,13,50,7,"[9.0, 12.0, 195.0, 13.0, 50.0, 7.0]",268.702729


In [36]:
query.stop()

In [37]:
# Write to the parquet
prediciton_write_parquet = counts_df_read.writeStream\
.format("parquet")\
.outputMode("append")\
.option("path", "prediction.parquet")\
.option("checkpointLocation","parquet/prediction/checkpoint")\
.start()

In [38]:
prediciton_write_parquet.stop()

In [39]:
# Check if the write process is successful
prediction_write_parquet = spark.read.parquet("prediction.parquet")
prediction_write_parquet.printSchema()

root
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)



## 2.8 Process the data analytics

## 2.8 a

<a class="anchor" id="lab-task-1"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:4px"><strong style="color:#FF5555">DESCRIPTION</strong> 
<br>In this question, to get the number of hours that the predicted pedestrian counts exceed 2000 on each day in December, first is to filter the prediction, then I use the window function to make it transfer one day data in each time according to the timestamp.
</div>

In [40]:
# Get the number of hours that the predicted pedestrian counts exceed 2000 on each day in December
prediction_processed = prediction\
    .filter(F.col("prediction") >= 2000)\
    .groupBy(F.window(prediction.timestamp, "4.9 seconds")\
             ,F.to_date("Date_Time").alias("Date"),"Sensor_ID")\
    .count()\
    .orderBy("Date","Sensor_ID")

In [41]:
# Use the query to check if the task is finished successful
query = prediction_processed\
    .writeStream\
    .outputMode("complete")\
    .format("memory")\
    .queryName("prediction_processed")\
    .trigger(processingTime="2 seconds")\
    .start()

In [54]:
prediction_processed_show = spark.sql("select * from prediction_processed")

In [55]:
prediction_processed_show.head(10000)

[Row(window=Row(start=datetime.datetime(2021, 2, 10, 21, 47, 35, 300000), end=datetime.datetime(2021, 2, 10, 21, 47, 40, 200000)), Date=datetime.date(2020, 12, 1), Sensor_ID=1, count=4),
 Row(window=Row(start=datetime.datetime(2021, 2, 10, 21, 47, 35, 300000), end=datetime.datetime(2021, 2, 10, 21, 47, 40, 200000)), Date=datetime.date(2020, 12, 1), Sensor_ID=2, count=6),
 Row(window=Row(start=datetime.datetime(2021, 2, 10, 21, 47, 35, 300000), end=datetime.datetime(2021, 2, 10, 21, 47, 40, 200000)), Date=datetime.date(2020, 12, 1), Sensor_ID=4, count=4),
 Row(window=Row(start=datetime.datetime(2021, 2, 10, 21, 47, 35, 300000), end=datetime.datetime(2021, 2, 10, 21, 47, 40, 200000)), Date=datetime.date(2020, 12, 1), Sensor_ID=5, count=2),
 Row(window=Row(start=datetime.datetime(2021, 2, 10, 21, 47, 35, 300000), end=datetime.datetime(2021, 2, 10, 21, 47, 40, 200000)), Date=datetime.date(2020, 12, 1), Sensor_ID=6, count=3),
 Row(window=Row(start=datetime.datetime(2021, 2, 10, 21, 47, 35, 

In [56]:
query.stop()

## 2.8 b

<a class="anchor" id="lab-task-1"></a>
<div style="background:rgba(0,109,174,0.2);padding:10px;border-radius:4px"><strong style="color:#FF5555">DESCRIPTION</strong> 
<br>This question asks if any sensor’s hourly count between 9am-11:59pm on the next day is predicted to exceed 2000, combine the result with sensor longitude and latitude information. First is to filter the prediction, then is to join the streaming data with the static sencor location information dataset.
</div>

In [57]:
# Combine the sensors whcih has more than 2000 hourly count with sensor location information
prediction_join = prediction \
    .filter(F.col("prediction") >= 2000) \
    .join(df_sensor, prediction.Sensor_ID == df_sensor.sensor_id)\
    .select(prediction["*"],df_sensor["latitude"],df_sensor["longitude"])

In [58]:
# Test if the join is successful
query = prediction_join\
    .writeStream\
    .outputMode("append")\
    .format("memory")\
    .queryName("prediction_join")\
    .trigger(processingTime="2 seconds")\
    .start()

In [59]:
prediction_join_show = spark.sql("select * from prediction_join").toPandas()

In [60]:
prediction_join_show.head(10)

Unnamed: 0,ID,Date_Time,Year,Month,Mdate,Day,Time,Sensor_ID,Sensor_Name,prev_count,timestamp,next_date,next_Mdate,next_day_week,next_day_of_week,features,prediction,latitude,longitude
0,3441825,2020-12-05 11:00:00,2020,December,5,Saturday,11,41,Flinders La-Swanston St (West),2292,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[11.0, 41.0, 2292.0, 6.0, 49.0, 7.0]",2445.197393,-37.816685,144.966904
1,3441851,2020-12-05 12:00:00,2020,December,5,Saturday,12,4,Town Hall (West),2397,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[12.0, 4.0, 2397.0, 6.0, 49.0, 7.0]",2126.28633,-37.81488,144.966095
2,3441855,2020-12-05 12:00:00,2020,December,5,Saturday,12,2,Bourke Street Mall (South),1900,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[12.0, 2.0, 1900.0, 6.0, 49.0, 7.0]",2066.017341,-37.813808,144.965164
3,3441889,2020-12-05 12:00:00,2020,December,5,Saturday,12,41,Flinders La-Swanston St (West),2672,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[12.0, 41.0, 2672.0, 6.0, 49.0, 7.0]",3172.878265,-37.816685,144.966904
4,3441915,2020-12-05 13:00:00,2020,December,5,Saturday,13,4,Town Hall (West),2597,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[13.0, 4.0, 2597.0, 6.0, 49.0, 7.0]",2808.595969,-37.81488,144.966095
5,3441919,2020-12-05 13:00:00,2020,December,5,Saturday,13,2,Bourke Street Mall (South),2247,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[13.0, 2.0, 2247.0, 6.0, 49.0, 7.0]",2350.521619,-37.813808,144.965164
6,3441920,2020-12-05 13:00:00,2020,December,5,Saturday,13,1,Bourke Street Mall (North),2728,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[13.0, 1.0, 2728.0, 6.0, 49.0, 7.0]",2625.709001,-37.813496,144.965149
7,3441937,2020-12-05 13:00:00,2020,December,5,Saturday,13,22,Flinders St-Elizabeth St (East),2063,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[13.0, 22.0, 2063.0, 6.0, 49.0, 7.0]",2035.271425,-37.817863,144.965073
8,3441953,2020-12-05 13:00:00,2020,December,5,Saturday,13,41,Flinders La-Swanston St (West),2908,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[13.0, 41.0, 2908.0, 6.0, 49.0, 7.0]",3212.116862,-37.816685,144.966904
9,3441979,2020-12-05 14:00:00,2020,December,5,Saturday,14,4,Town Hall (West),2517,2021-02-10 10:50:35.303,2020-12-06,6,49,7,"[14.0, 4.0, 2517.0, 6.0, 49.0, 7.0]",2878.11054,-37.81488,144.966095


In [61]:
query.stop()

In [62]:
# Select the column information that will be used in task 3.
df_map = prediction_join\
               .withColumn("value", F.to_json(F.struct("Date_Time", "latitude", "longitude","Sensor_ID")))\
               .withColumn("key", F.col("Date_Time"))\
               .select("key","value")


In [63]:
query = df_map\
    .writeStream\
    .outputMode("append")\
    .format("memory")\
    .queryName("df_map")\
    .trigger(processingTime="2 seconds")\
    .start()

In [64]:
df_map_show = spark.sql("select * from df_map").toPandas()

In [65]:
df_map_show.head(10)

Unnamed: 0,key,value
0,2020-12-07 12:00:00,"{""Date_Time"":""2020-12-07T12:00:00.000Z"",""latit..."
1,2020-12-07 12:00:00,"{""Date_Time"":""2020-12-07T12:00:00.000Z"",""latit..."
2,2020-12-07 12:00:00,"{""Date_Time"":""2020-12-07T12:00:00.000Z"",""latit..."
3,2020-12-07 12:00:00,"{""Date_Time"":""2020-12-07T12:00:00.000Z"",""latit..."
4,2020-12-07 12:00:00,"{""Date_Time"":""2020-12-07T12:00:00.000Z"",""latit..."
5,2020-12-07 12:00:00,"{""Date_Time"":""2020-12-07T12:00:00.000Z"",""latit..."
6,2020-12-07 12:00:00,"{""Date_Time"":""2020-12-07T12:00:00.000Z"",""latit..."
7,2020-12-07 13:00:00,"{""Date_Time"":""2020-12-07T13:00:00.000Z"",""latit..."
8,2020-12-07 13:00:00,"{""Date_Time"":""2020-12-07T13:00:00.000Z"",""latit..."
9,2020-12-07 13:00:00,"{""Date_Time"":""2020-12-07T13:00:00.000Z"",""latit..."


In [66]:
query.stop()

In [67]:
ds = df_map\
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .outputMode("append")\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")\
  .option("topic", "map")\
  .option("checkpointLocation","df_map/checkpoint")\
  .start()

In [68]:
ds.stop()