# Read Data as Stream from Azure Event Hub

In [0]:
%sql DROP TABLE IF EXISTS iot.iot_stream

In [0]:
import os
import pyspark.sql.functions as F
from pyspark.sql.types import *

connection_string = os.getenv('AZURE_EH_CON_STR')
conf = {}
conf["eventhubs.connectionString"] = f'{spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_string)}'

incomingStream = (
  spark
    .readStream
    .format("eventhubs")
    .options(**conf)
    .load()
)

incomingStream.writeStream.outputMode("append").format("memory").queryName("read_hub").start()

In [0]:
message_schema = StructType([
  StructField('timestamp', TimestampType()),
  StructField('humidity', FloatType()),
  StructField('temperature', FloatType()),
  StructField('device_id', StringType()),
  StructField('device_ip', StringType()),
  StructField('device_location', StringType())  
])

messages = incomingStream \
  .withColumn("Offset", F.col("offset").cast(LongType())) \
  .withColumn("Time", F.col("enqueuedTime").cast(TimestampType())) \
  .withColumn("Timestamp", F.col("enqueuedTime").cast(LongType())) \
  .withColumn("Body", F.col("body").cast(StringType())) \
  .select("Offset", "Time", "Timestamp", "Body", F.from_json(F.col("body").cast("string"), message_schema).alias("payload"))

messages.select(F.col('payload.*')) \
                .writeStream \
                .outputMode("append") \
                .format("delta") \
                .option("path", '/mnt/stdhamacheradl001/iot/simulator') \
                .option("checkpointLocation", '/mnt/stdhamacheradl001/iot/simulator/_checkpoint') \
                .start()
# messages.select(F.col('payload.*').writeStream.outputMode("append").format("console").option("truncate", 'false').start()

In [0]:
display(messages)

Offset,Time,Timestamp,Body,payload
16338208,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.331337"", ""humidity"": 0.46, ""temperature"": 22.3, ""device_id"": ""e3e70682-c209-4cac-a29f-6fbed82c07cd"", ""device_ip"": ""172.31.140.184"", ""device_location"": ""[\""46.1351\"", \""-60.1831\"", \""Sydney\"", \""CA\"", \""America/Glace_Bay\""]""}","List(2020-11-26T17:09:22.331+0000, 0.46, 22.3, e3e70682-c209-4cac-a29f-6fbed82c07cd, 172.31.140.184, [""46.1351"", ""-60.1831"", ""Sydney"", ""CA"", ""America/Glace_Bay""])"
16338520,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.332352"", ""humidity"": 0.66, ""temperature"": 22.87, ""device_id"": ""7a024204-f7c1-4d87-8da5-e709d4713d60"", ""device_ip"": ""172.22.253.123"", ""device_location"": ""[\""49.88307\"", \""-119.48568\"", \""Kelowna\"", \""CA\"", \""America/Vancouver\""]""}","List(2020-11-26T17:09:22.332+0000, 0.66, 22.87, 7a024204-f7c1-4d87-8da5-e709d4713d60, 172.22.253.123, [""49.88307"", ""-119.48568"", ""Kelowna"", ""CA"", ""America/Vancouver""])"
16338840,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.332352"", ""humidity"": 0.52, ""temperature"": 23.85, ""device_id"": ""cca5a5a1-9e4d-4e3c-9846-d424c17c6279"", ""device_ip"": ""10.75.62.134"", ""device_location"": ""[\""46.23899\"", \""-63.13414\"", \""Charlottetown\"", \""CA\"", \""America/Halifax\""]""}","List(2020-11-26T17:09:22.332+0000, 0.52, 23.85, cca5a5a1-9e4d-4e3c-9846-d424c17c6279, 10.75.62.134, [""46.23899"", ""-63.13414"", ""Charlottetown"", ""CA"", ""America/Halifax""])"
16339160,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.332352"", ""humidity"": 0.45, ""temperature"": 25.3, ""device_id"": ""af19922a-d9b8-4714-a61a-441c12e0c8b2"", ""device_ip"": ""10.181.36.35"", ""device_location"": ""[\""49.88307\"", \""-119.48568\"", \""Kelowna\"", \""CA\"", \""America/Vancouver\""]""}","List(2020-11-26T17:09:22.332+0000, 0.45, 25.3, af19922a-d9b8-4714-a61a-441c12e0c8b2, 10.181.36.35, [""49.88307"", ""-119.48568"", ""Kelowna"", ""CA"", ""America/Vancouver""])"
16339480,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.333344"", ""humidity"": 0.69, ""temperature"": 19.68, ""device_id"": ""f77383c1-3458-4748-a9bb-17bca3f2c9bf"", ""device_ip"": ""172.24.85.243"", ""device_location"": ""[\""53.51684\"", \""-113.3187\"", \""Sherwood Park\"", \""CA\"", \""America/Edmonton\""]""}","List(2020-11-26T17:09:22.333+0000, 0.69, 19.68, f77383c1-3458-4748-a9bb-17bca3f2c9bf, 172.24.85.243, [""53.51684"", ""-113.3187"", ""Sherwood Park"", ""CA"", ""America/Edmonton""])"
16339800,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.333344"", ""humidity"": 0.54, ""temperature"": 20.32, ""device_id"": ""17e0aa3c-0398-4ca8-aa7e-9d498c778ea6"", ""device_ip"": ""192.168.0.149"", ""device_location"": ""[\""50.26729\"", \""-119.27337\"", \""Vernon\"", \""CA\"", \""America/Vancouver\""]""}","List(2020-11-26T17:09:22.333+0000, 0.54, 20.32, 17e0aa3c-0398-4ca8-aa7e-9d498c778ea6, 192.168.0.149, [""50.26729"", ""-119.27337"", ""Vernon"", ""CA"", ""America/Vancouver""])"
16340120,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.333344"", ""humidity"": 0.66, ""temperature"": 22.65, ""device_id"": ""534097ca-baf3-497a-be70-f16a55485822"", ""device_ip"": ""192.168.113.131"", ""device_location"": ""[\""53.51684\"", \""-113.3187\"", \""Sherwood Park\"", \""CA\"", \""America/Edmonton\""]""}","List(2020-11-26T17:09:22.333+0000, 0.66, 22.65, 534097ca-baf3-497a-be70-f16a55485822, 192.168.113.131, [""53.51684"", ""-113.3187"", ""Sherwood Park"", ""CA"", ""America/Edmonton""])"
16340448,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.334340"", ""humidity"": 0.5, ""temperature"": 20.13, ""device_id"": ""72ae2244-8b01-43c1-8d9d-2b7d247a8333"", ""device_ip"": ""10.250.131.204"", ""device_location"": ""[\""49.88307\"", \""-119.48568\"", \""Kelowna\"", \""CA\"", \""America/Vancouver\""]""}","List(2020-11-26T17:09:22.334+0000, 0.5, 20.13, 72ae2244-8b01-43c1-8d9d-2b7d247a8333, 10.250.131.204, [""49.88307"", ""-119.48568"", ""Kelowna"", ""CA"", ""America/Vancouver""])"
16340768,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.334340"", ""humidity"": 0.57, ""temperature"": 24.22, ""device_id"": ""8c25166a-1ff3-4849-b4e1-357d4a84eb03"", ""device_ip"": ""10.104.10.18"", ""device_location"": ""[\""43.86682\"", \""-79.2663\"", \""Markham\"", \""CA\"", \""America/Toronto\""]""}","List(2020-11-26T17:09:22.334+0000, 0.57, 24.22, 8c25166a-1ff3-4849-b4e1-357d4a84eb03, 10.104.10.18, [""43.86682"", ""-79.2663"", ""Markham"", ""CA"", ""America/Toronto""])"
16341080,2020-11-27T00:09:24.817+0000,1606435764,"{""timestamp"": ""2020-11-26 17:09:22.334340"", ""humidity"": 0.44, ""temperature"": 23.93, ""device_id"": ""5129fb7c-6288-41a5-8c45-782198a6416d"", ""device_ip"": ""172.22.15.126"", ""device_location"": ""[\""45.43341\"", \""-73.86586\"", \""Beaconsfield\"", \""CA\"", \""America/Toronto\""]""}","List(2020-11-26T17:09:22.334+0000, 0.44, 23.93, 5129fb7c-6288-41a5-8c45-782198a6416d, 172.22.15.126, [""45.43341"", ""-73.86586"", ""Beaconsfield"", ""CA"", ""America/Toronto""])"


In [0]:
spark.sql("""
   CREATE TABLE IF NOT EXISTS iot.iot_stream
   USING DELTA
   LOCATION '{0}'
  """.format('/mnt/stdhamacheradl001/iot/simulator'))

In [0]:
for s in spark.streams.active:
    s.stop()