In [0]:
from pyspark.sql.types import TimestampType, StringType, StructType, StructField

# Path to our 20 JSON files
inputPath = "/FileStore/tables/ids721proj3/"

## Loading and Inspecting the Data
The goal of this project is to realize the structual streaming of data using Pyspark. Since we are streaming data from one source to another, the first step to do is to set up the schema and the structure of the data.
The simulated dataset shows the status of different IOT appliances.

In [0]:
#Set up the schema of data
schema = StructType([ StructField("time", StringType(), True),
                      StructField("customer", StringType(), True),
                      StructField("action", StringType(), True),
                      StructField("device", StringType(), True)])

In [0]:
#I have uploaded 20 dataframes to Databricks and we can create a dataframe here:
#The original data was stored in JSON payloads
from pyspark.sql.types import *

# Path to our 20 JSON files
inputPath = "/FileStore/tables/ids721proj3/"
#Create a spark dataframe from the 20 JSON files
df = (spark.read.schema(schema).json(inputPath))
df = df.dropna()
display(df)

time,customer,action,device
10:59:46.000 AM,Stafford Blakebrough,power on,GreenIQ Controller
6:26:36.000 PM,Alex Woolcocks,power on,Nest T3021US Thermostat
4:46:28.000 AM,Clarice Nayshe,power on,Footbot Air Quality Monitor
8:58:43.000 AM,Killie Pirozzi,power off,Footbot Air Quality Monitor
4:20:49.000 PM,Lynne Dymidowicz,power on,Footbot Air Quality Monitor
3:41:33.000 AM,Shaina Dowyer,power on,ecobee4
10:40:24.000 PM,Barbee Melato,low battery,August Doorbell Cam
11:13:38.000 PM,Clem Westcot,power off,Nest T3021US Thermostat
10:12:15.000 PM,Kerri Galfour,power off,Amazon Echo
11:04:41.000 AM,Trev Ashmore,low battery,GreenIQ Controller


Do some aggregation to take a brief summary of the dataset using queries.
Check the status of machine on the IOT chain.

In [0]:
groupeddf = (df.groupBy(df.action).count())
groupeddf.cache()

# Create temporary table
groupeddf.createOrReplaceTempView("action_counts")

In [0]:
%sql
SELECT action, sum(count) as TotalCount
FROM action_counts 
GROUP BY action
ORDER BY action desc

action,TotalCount
power on,6693
power off,6611
low battery,6676


### Streaming the Data
The focus of this demo is to showcase the streaming of data, so I will not build a lambda function on AWS to scrape JSON payloads from APIs and get new data from time to time. Instead, I uploaded 20 JSON files all at once to databricks and then I will use PySpark to read in one JSON file at a time to emulate the streaming process of incoming new data.

In [0]:
streamingdf = (spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).json(inputPath))
streamingdfagg = (streamingdf.groupBy(streamingdf.action).count())

In [0]:
streamingdfagg.isStreaming

The streaming dataframe is created. The next step is to set a destination for the data to get streamed into. I can write a query to view the process of streaming data in the real time.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")
query = (streamingdfagg.writeStream.format("memory").queryName("counts").outputMode("complete").start())