# Data streaming end-to-end med Azure Event Hub (Apache Kafka), Databricks, Delta Lake og Python
Denne test er et eksempel, hvordan man kan:
1. Læse data fra Azure Event Hub
2. Tjekket indholdet af dataved at skrive til memeory
3. Skrive data til Azure Data Lake Gen2 med Open Source storage format Delta Lake
4. Manipulere data
5. Sende data til ny Azure Event Hub for applikations brug

**Det intressante** er limen mellem indlæsninga af data (source) og push af data (sink) er den samme for alle scenarier. Samme teknologi (Apache Spark) for alle scenarier (batch, streaming, data lake, eventhub, kafka m.fl.).
**Delta Lake** bliver kun vist i dette tilfælde, hvordan det let kan blive brugt som storage format. Der er en lang række fordele, som ikke bliver dækket i dette eksempel.

![Structured Streaming](https://raw.githubusercontent.com/boje/intellishore/master/intellishore-vd.gif)

#### Læs data fra Azure Event Hub som en Spark DataFrames.

In [3]:
import os
conf_iot = {}
conf_iot["eventhubs.connectionString"] = "Endpoint=sb://vd-count.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={};EntityPath=vd".format(os.getenv("EVENT_HUB_KEY"))

read_df = (
  spark
    .readStream
    .format("eventhubs")
    .options(**conf_iot)
    .load()
)

# Vist skema for DataFrame
read_df.printSchema()

#### Vis antallet af læsninger fra Azure Event Hub
Aflæsning bliver ved med at opdatere og kører i baggrunden.

In [5]:
# Tjek, hvor mange request der kommer per sekund
from pyspark.sql.functions import col
agg =  (
  read_df
    .select(col("enqueuedTime").cast("string").substr(11,9).alias("Time"))
    .groupBy("Time")
    .count()
    .orderBy("Time", ascending=False)
    .limit(5)
)
display(agg)

Time,count
20:21:41,1
20:21:40,2
20:21:39,1
20:21:38,2
20:21:37,1


Lad os se på indeholdet af beskederne sendt til Azure Event Hub. Bemærk at ``body`` bliver sendt som binært og bliver gjort læsbart ved at konvertere datatype til ``String```

In [7]:
# Skriver stream data ned i memory, så der kan fortages SQL
from pyspark.sql.types import *
from pyspark.sql.functions import *

query = (
  read_df
    .writeStream
    .format("memory")
    .queryName("read_hub")
    .start()
)

In [8]:
%sql
-- Læs data indhold af data stream
CREATE OR REPLACE TEMPORARY VIEW event_msg AS
select 
  cast(body as String) as body_to_string,
  * 
  from read_hub 
  order by enqueuedTime desc;
  
SELECT * FROM event_msg LIMIT 2

body_to_string,body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
"{""type"": ""Feature"", ""id"": ""OPEN_DATA_NOEGLETAL_VIEW.fid--7458e2e2_1714a9e4909_-6ab2"", ""geometry"": {""type"": ""Point"", ""coordinates"": [718114.00107018, 6207932.99975957]}, ""geometry_name"": ""KOOR_SDO"", ""properties"": {""DATAEJER"": ""0"", ""VEJBESTYRER"": 0, ""VEJNR"": 2083325, ""VEJDEL"": ""0"", ""VEJNAVN"": ""Humleb\u00e6kvej"", ""KILOMETER"": 5, ""METER"": 910, ""LOKATION"": ""vf H\u00f8rsholmvej"", ""KOMMUNE"": 210, ""KOERETOEJSART"": ""MOTORKTJ"", ""AAR"": 2017, ""AADT"": 9386, ""HDT"": 10898, ""JDT"": 7670, ""TRAFIKTYPE"": ""BO-ARB"", ""GNS_HASTIGHED"": 51.4, ""TALTE_DAGE"": 7, ""AE10HOEJ"": 453, ""AE10LAV"": null, ""LBIL_AADT"": 433, ""HAST_GRAENSE"": 80, ""FRAKTIL_PCT_15"": 40.4, ""FRAKTIL_PCT_85"": 61.6, ""PCT_OVER_HASTGR"": 1.1, ""PCT_OVER_HASTGR_P10"": 0.4, ""PCT_OVER_HASTGR_P20"": 0.1, ""LBIL_PCT"": 4.6, ""BILER_LIVE"": 45}, ""event_push_ts"": ""2020-04-14 22:22:17.725713""}",eyJ0eXBlIjogIkZlYXR1cmUiLCAiaWQiOiAiT1BFTl9EQVRBX05PRUdMRVRBTF9WSUVXLmZpZC0tNzQ1OGUyZTJfMTcxNGE5ZTQ5MDlfLTZhYjIiLCAiZ2VvbWV0cnkiOiB7InR5cGUiOiAiUG9pbnQiLCAiY29vcmRpbmF0ZXM= (truncated),1,4302105104,50484,2020-04-14T20:22:17.754+0000,,,Map(),Map()
"{""type"": ""Feature"", ""id"": ""OPEN_DATA_NOEGLETAL_VIEW.fid--7458e2e2_1714a9e4909_-6ab3"", ""geometry"": {""type"": ""Point"", ""coordinates"": [588511.00000273, 6139022.99974554]}, ""geometry_name"": ""KOOR_SDO"", ""properties"": {""DATAEJER"": ""461"", ""VEJBESTYRER"": 461, ""VEJNR"": 4610523, ""VEJDEL"": ""0"", ""VEJNAVN"": ""Bernstoffsvej"", ""KILOMETER"": 0, ""METER"": 71, ""LOKATION"": ""ved nr. 10"", ""KOMMUNE"": 461, ""KOERETOEJSART"": ""MOTORKTJ"", ""AAR"": 2015, ""AADT"": 395, ""HDT"": 427, ""JDT"": 332, ""TRAFIKTYPE"": ""BO-ARB"", ""GNS_HASTIGHED"": 30.9, ""TALTE_DAGE"": 7.7, ""AE10HOEJ"": 4, ""AE10LAV"": 2, ""LBIL_AADT"": 12, ""HAST_GRAENSE"": 50, ""FRAKTIL_PCT_15"": 21.8, ""FRAKTIL_PCT_85"": 39.3, ""PCT_OVER_HASTGR"": 0.9, ""PCT_OVER_HASTGR_P10"": 0, ""PCT_OVER_HASTGR_P20"": 0, ""LBIL_PCT"": 2.9, ""BILER_LIVE"": 6}, ""event_push_ts"": ""2020-04-14 22:22:16.686849""}",eyJ0eXBlIjogIkZlYXR1cmUiLCAiaWQiOiAiT1BFTl9EQVRBX05PRUdMRVRBTF9WSUVXLmZpZC0tNzQ1OGUyZTJfMTcxNGE5ZTQ5MDlfLTZhYjMiLCAiZ2VvbWV0cnkiOiB7InR5cGUiOiAiUG9pbnQiLCAiY29vcmRpbmF0ZXM= (truncated),0,4302281680,50310,2020-04-14T20:22:16.714+0000,,,Map(),Map()


Lad os folde ``body``JSON beskeden ud

In [10]:
# Vælg elementer fra JSON besked ud
read_df_body = read_df.selectExpr("CAST(body AS STRING) AS body_to_string")
json_event_msg = read_df_body.select(
                     get_json_object(read_df_body.body_to_string, "$.id").alias("id"),
                     get_json_object(read_df_body.body_to_string, "$.properties.VEJNAVN").alias("vejnavn"),
                     get_json_object(read_df_body.body_to_string, "$.properties.AADT").alias("aadt"),
                     get_json_object(read_df_body.body_to_string, "$.properties.HDT").alias("hdt"),
                     get_json_object(read_df_body.body_to_string, "$.properties.JDT").alias("jdt"),
                     get_json_object(read_df_body.body_to_string, "$.properties.GNS_HASTIGHED").alias("gns_hastighed"),
                     get_json_object(read_df_body.body_to_string, "$.properties.HAST_GRAENSE").alias("hastigheds_graense"),
                     get_json_object(read_df_body.body_to_string, "$.properties.BILER_LIVE").alias("biler_live").cast('int'),
                     get_json_object(read_df_body.body_to_string, "$.properties.LOKATION").alias("lokation"),
                     get_json_object(read_df_body.body_to_string, "$.geometry.coordinates[0]").alias("UTM_koordinat_1").cast('float'),
                     get_json_object(read_df_body.body_to_string, "$.geometry.coordinates[1]").alias("UTM_koordinat_2").cast('float'),
                     get_json_object(read_df_body.body_to_string, "$.event_push_ts").alias("event_push_ts")
                    )

### Konverter UTM-32-U til længde- og breddegrader

Vejdirektoratets Mastra system sender data i UTM. De fleste systemer benytter sig af længdegrader og breddegrader.
Derfor bnyttes en Python pakke _utm_ til at konvertere fra UTM-32-U til længdegrader og breddegrader.

**Bemærk** alle Python pakke kan bruges i Spark, kræver dog men laver denne om en UDF (User Define Function). Se linje 7.

In [12]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import utm

json_event_msg_ext = json_event_msg

# Opret UDF for konvertering af UTM til længde- og breddegrader
utm_udf_x = F.udf(lambda x,y: float(utm.to_latlon(x,y, 32, 'U')[0]), FloatType())
utm_udf_y = F.udf(lambda x,y: float(utm.to_latlon(x,y, 32, 'U')[1]), FloatType())

json_event_msg_ext = json_event_msg_ext.withColumn('lat',utm_udf_x(F.col('UTM_koordinat_1'), F.col('UTM_koordinat_2')))
json_event_msg_ext = json_event_msg_ext.withColumn('lon',utm_udf_y(F.col('UTM_koordinat_1'), F.col('UTM_koordinat_2')))

#### Plot resultaterne på kort

In [14]:
data = (
  json_event_msg_ext
    .writeStream
    .format("memory")
    .queryName("data_map")
    .start()
)

In [15]:
import folium
import utm

data = sqlContext.sql("SELECT DISTINCT lat, lon, lokation FROM data_map")

# Make an empty map
m = folium.Map(location=[56.3, 10.45], zoom_start=7)

# I can add marker one by one on the map
for r in data.collect():
  folium.Marker([r.lat, r.lon], popup=r.lokation).add_to(m)

html_string = m._repr_html_()
displayHTML(html_string)

## Gem alt data til data lake (Azure Data Lake Gen2)
Som best practise skrives, alt indhold fra Azure Event Hub til Azure Data Lake Gen2. 
Til dette bruges Open Source Storage Layer **Delta Lake**. Dette gøres bla. for at benytte fordelene som:

- **ACID stranscations**: Serialization isolation sikrer at læserne aldrig læser indkonsistense data 
- **Support af batch og streaming**
- **Time travel:** Data versioning, fuld historik og audit trals
- **Schema validering:** Kvalitetssikring af data skema er som foventet og kan håntere må variationer af skema ændringer.


 <!---
<img src="https://static.wixstatic.com/media/eae6a5_932a3f75c53a4251b953da4224b3792f~mv2.jpg/v1/fill/w_1200,h_496,al_c,q_90/eae6a5_932a3f75c53a4251b953da4224b3792f~mv2.webp" width="800px">
--->

In [17]:
# Gem data i Azure Data Lake Gen2
(read_df
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/vd/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/vd/delta/events"))

Tjek data er blevet skrevet til Azure Data Lake Gen2 (``/mnt/vd/delta/events``)

In [19]:
%fs ls /mnt/vd/delta/events

path,name,size
dbfs:/mnt/vd/delta/events/_checkpoints/,_checkpoints/,0
dbfs:/mnt/vd/delta/events/_delta_log/,_delta_log/,0
dbfs:/mnt/vd/delta/events/part-00000-0e09f471-ec88-4624-ac49-b57bbbcc861d-c000.snappy.parquet,part-00000-0e09f471-ec88-4624-ac49-b57bbbcc861d-c000.snappy.parquet,7470
dbfs:/mnt/vd/delta/events/part-00000-b856ace3-dd44-442f-8ff1-a93f041399fd-c000.snappy.parquet,part-00000-b856ace3-dd44-442f-8ff1-a93f041399fd-c000.snappy.parquet,1249
dbfs:/mnt/vd/delta/events/part-00000-d2c379c0-f14e-491f-9148-e1cd96ea7697-c000.snappy.parquet,part-00000-d2c379c0-f14e-491f-9148-e1cd96ea7697-c000.snappy.parquet,8007
dbfs:/mnt/vd/delta/events/part-00001-4c243228-e84f-43c7-8047-8637b2940d30-c000.snappy.parquet,part-00001-4c243228-e84f-43c7-8047-8637b2940d30-c000.snappy.parquet,7475
dbfs:/mnt/vd/delta/events/part-00001-de463c10-e8f9-48f5-99bb-ec0c1efbec0f-c000.snappy.parquet,part-00001-de463c10-e8f9-48f5-99bb-ec0c1efbec0f-c000.snappy.parquet,7529


### Skriv til Azure Event Hub
Ved tredjeparts løsninger, kan Azure Event Hub bruges til bro mellem data og løsning

In [21]:
from pyspark.sql import Row

conf_app = {}
conf_app["eventhubs.connectionString"] = "Endpoint=sb://vd-count.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={};EntityPath=vd-app".format(os.getenv("EVENT_HUB_KEY"))

select_ = "CAST(CONCAT('{\"lokation\":\"', lokation, '\",\"lat\":', lat, ',\"lon\":', lon, ',\"biler_live\":', biler_live, ',\"event_push_ts\":\"', event_push_ts, '\",\"event_push_ts_db\":\"', current_timestamp(), '\"}') AS String) as body"

(json_event_msg_ext
  .selectExpr(select_)
  .writeStream
  .format("eventhubs")
  .options(**conf_app)
  .option("checkpointLocation", "/mnt/vd/delta/push/_checkpoints/app")
  .start("/mnt/vd/delta/push"))

Stop streamings job og slet data

In [23]:
for s in spark.streams.active:
    s.stop()

In [24]:
dbutils.fs.rm("/mnt/vd/delta/events/", True)
dbutils.fs.rm("/mnt/vd/delta/push/", True)
dbutils.fs.rm("/delta/", True)