## Demo - Use of Databricks Structured streaming to process streaming data 
### Load the data from Azure event hub to delta lake

This notebook shows you how to use Databricks notebbok to consume real time event data from Azure event hub.

ADLS Gen2 is mounted to store event data to a data lake.

In [0]:
#Input parameters
#mount point path
dbutils.widgets.text("mount_point_path", "/mnt/stream-data")
#file_system
dbutils.widgets.text("file_system", "stream-data")
#account_name
dbutils.widgets.text("account_name", "dbcoedl")

In [0]:
#Read Parameters

mount_point_path = dbutils.widgets.get("mount_point_path")
file_system = dbutils.widgets.get("file_system")
account_name = dbutils.widgets.get("account_name")

In [0]:
#Mount with access key is not recommonded way
dbutils.fs.mount(
  source = "wasbs://{}@{}.blob.core.windows.net".format(file_system,account_name),
  mount_point = mount_point_path,
  extra_configs = {"fs.azure.account.key.dbcoedl.blob.core.windows.net": "xgFPK3uYt2t0rCRcfkflpq1U0hzBzV9PS73QYJ4UDHy1rPOPwgaGlAuUO/tG5EDuCdmKugk7srdT+AStHfcDJR=="})

In [0]:
#Check mount points
dbutils.fs.mounts()

#### Preparation (Set up Event Hub and library installation)
Before starting,

- Create Event Hub Namespace resource in Azure Portal
- Create new Event Hub in the previous namespace
- Create SAS policy and copy connection string on generated Event Hub entity
- Install Event Hub library to your cluster
- Go to Cluster -> Libraries -> Install New and Select Maven. Install "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22" on "Maven" source
- Insall "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2" library in similar way 
- Add following config to Cluster

`spark.mongodb.output.uri mongodb+srv://admin:demo%40PSL@cluster0.s5tuva0.mongodb.net/events_db?retryWrites=true&w=majority`
`spark.mongodb.input.uri mongodb+srv://admin:demo%40PSL@cluster0.s5tuva0.mongodb.net/events_db?retryWrites=true&w=majority`

Read stream from Azure Event Hub as streaming dataframe using `readStream()`.  
You must set your namespace, entity, policy name, and key for Azure Event Hub in the following command.

In [0]:
# Read Event Hub's stream
conf = {}
conf["eventhubs.connectionString"] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://events-feed.servicebus.windows.net/;SharedAccessKeyName=manage_user_access_policy;SharedAccessKey=EYMfb85RM5wMgBujKH+D+P/MbFb1Auo+BGkgAbWakIJ=;EntityPath=demo-topic")


In [0]:
read_df = (
  spark
    .readStream
    .format("eventhubs")
    .options(**conf)
    .load()
)

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

claims_schema = StructType([
    StructField("id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("phone_number", StringType(), True),
    StructField("country", StringType(), True),
    StructField("claim_amount", IntegerType(), True),
    StructField("type_id", StringType(), True),
    StructField("status", StringType(), True)
])


In [0]:
# Read the event body
decoded_df = read_df.select(from_json(col("body").cast("string"), claims_schema).alias("payload"))

In [0]:
claims_df = decoded_df.withColumn("id", col("payload.id"))\
.withColumn("customer_name", col("payload.customer_name"))\
.withColumn("phone_number", col("payload.phone_number"))\
.withColumn("country", col("payload.country"))\
.withColumn("claim_amount", col("payload.claim_amount"))\
.withColumn("type_id", col("payload.type_id"))\
.withColumn("status", col("payload.status"))\
.drop("payload")

claims_df.printSchema()

In [0]:
display(claims_df, processingTime = "5 seconds")

id,customer_name,phone_number,country,claim_amount,type_id,status
8e9fc93b-39cc-4797-bfd7-8df18fa5d4f3,Yolanda Whitaker,4937251347,USA,7925,PL,Approved
aa619ba1-d39e-4fbb-a7d9-ea4fe6338f29,Erica Romero,001-911-084-6867,COG,3766,CH,Rejected
f95860c7-b7e0-4c62-b4e4-cee075152c07,Richard Ford,(183)191-0771,CAF,2622,PT,Hold
8ffb38f4-4562-46a8-bbd7-d3a39014eac5,Bradley Peterson,144.817.6362x279,IDN,9331,PT,Rejected
120b68c0-1811-45ac-9600-7d72f561b1a2,Steven Wyatt,(540)399-6770x5228,GEO,7798,CP,Rejected
e0a193ad-eeef-4a71-914e-82fcb9a6f125,Elizabeth Daniels,546-945-5946x710,USA,7030,CA,Hold
2f6fb888-4ba9-48a4-96eb-8a5284d63bab,Daniel Yang,587-983-4047x46744,NCL,9827,PM,Rejected
49ce4833-9b82-4925-80ea-0edcd98107f9,Charles Padilla,001-397-253-5249x03133,NPL,5698,CP,Rejected
f413b52b-345b-43d5-83a7-1fa721b75bc1,Brittany Esparza,874-281-5153x2907,KAZ,9490,PL,Approved
eee27b44-893a-4020-83b0-3368b158d470,Andre Townsend,751-559-6908x69318,USA,7063,PL,Hold


In [0]:
#data enahancement
claims_df = claims_df.withColumn("processed", current_timestamp())

In [0]:
claims_types_df = spark.read.format("delta").table("events_db.insurance_types")
display(claims_types_df)

id,type
PL,Personal Life Insurance
PM,Personal Motor Insurance
PT,Personal Travel Insurance
PH,Personal Home Insurance
CA,Commercial Accident Insurance
CH,Commercial Health Insurance
CP,Commercial Property Insurance


In [0]:
claims_df=claims_df.join(claims_types_df, claims_df.type_id==claims_types_df.id, "inner").drop(claims_types_df.id)
display(claims_df)

id,customer_name,phone_number,country,claim_amount,type_id,status,processed,type
8e9fc93b-39cc-4797-bfd7-8df18fa5d4f3,Yolanda Whitaker,4937251347,USA,7925,PL,Approved,2022-08-17T11:46:26.005+0000,Personal Life Insurance
aa619ba1-d39e-4fbb-a7d9-ea4fe6338f29,Erica Romero,001-911-084-6867,COG,3766,CH,Rejected,2022-08-17T11:46:26.005+0000,Commercial Health Insurance
f95860c7-b7e0-4c62-b4e4-cee075152c07,Richard Ford,(183)191-0771,CAF,2622,PT,Hold,2022-08-17T11:46:26.005+0000,Personal Travel Insurance
8ffb38f4-4562-46a8-bbd7-d3a39014eac5,Bradley Peterson,144.817.6362x279,IDN,9331,PT,Rejected,2022-08-17T11:46:26.005+0000,Personal Travel Insurance
120b68c0-1811-45ac-9600-7d72f561b1a2,Steven Wyatt,(540)399-6770x5228,GEO,7798,CP,Rejected,2022-08-17T11:46:26.005+0000,Commercial Property Insurance
e0a193ad-eeef-4a71-914e-82fcb9a6f125,Elizabeth Daniels,546-945-5946x710,USA,7030,CA,Hold,2022-08-17T11:46:26.005+0000,Commercial Accident Insurance
2f6fb888-4ba9-48a4-96eb-8a5284d63bab,Daniel Yang,587-983-4047x46744,NCL,9827,PM,Rejected,2022-08-17T11:46:26.005+0000,Personal Motor Insurance
49ce4833-9b82-4925-80ea-0edcd98107f9,Charles Padilla,001-397-253-5249x03133,NPL,5698,CP,Rejected,2022-08-17T11:46:26.005+0000,Commercial Property Insurance
f413b52b-345b-43d5-83a7-1fa721b75bc1,Brittany Esparza,874-281-5153x2907,KAZ,9490,PL,Approved,2022-08-17T11:46:26.005+0000,Personal Life Insurance
eee27b44-893a-4020-83b0-3368b158d470,Andre Townsend,751-559-6908x69318,USA,7063,PL,Hold,2022-08-17T11:46:26.005+0000,Personal Life Insurance


For real IoT or Sales data stream , you would drop duplicates, do aggregation using `window` function etc.  
As an example,
```
def aggregateSalesREvenue(df,watermarkLateness,timeWindowSize,aggregationKey):
  return (
  df.withWatermark("timestamp", watermarkLateness)
  .groupBy(
    window("timestamp", timeWindowSize),
    col(aggregationKey))
  .agg(sum(col("sales")).alias("sales")))
```

#### Write the datastream to Delta Table for Data Analysis

We will create a database and store the stream data as delta table.

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS events_db;
USE events_db;

In [0]:
check_point_path = "dbfs:/FileStore/events/_checkpoints/event_stream"

delta_write_query = claims_df.writeStream\
.format("delta")\
.outputMode("append")\
.option("checkpointLocation", check_point_path)\
.queryName("delta_write_query")\
.toTable("claims_data")

In [0]:
%sql
SELECT * FROM claims_data LIMIT 10;

id,customer_name,phone_number,country,claim_amount,type_id,status,processed,type
dc334b82-75a4-4728-959f-8f8518571b6e,Mary Ortiz,515.813.2245x8873,BFA,7617,PH,Approved,2022-08-16T12:22:58.504+0000,Personal Home Insurance
22af3d95-07d6-44ab-94f4-33b7aa50b2bd,Jennifer Huang,+1-161-076-3488,FIN,1109,CH,Hold,2022-08-16T12:22:58.504+0000,Commercial Health Insurance
6d3d3ca9-d1e8-4704-99b3-9b93c67eecbb,Caleb Ortiz,(903)183-9576x092,GRL,8096,PM,Approved,2022-08-16T12:22:58.504+0000,Personal Motor Insurance
97206895-26b5-40aa-8ad9-d357cab63571,Travis White,+1-450-032-5391x66451,MUS,9623,CH,Hold,2022-08-16T12:22:58.504+0000,Commercial Health Insurance
06b1dc35-0926-4c26-8da4-f93e97e5ce4c,Nicholas Hanna,(531)066-0387x006,IMN,2317,CH,Hold,2022-08-16T12:22:58.504+0000,Commercial Health Insurance
3f55e6a6-a937-4ee5-89d0-33d120534a02,Christina Sanders,197.110.8431,IND,2711,PH,Hold,2022-08-16T12:22:58.504+0000,Personal Home Insurance
7030b96d-1f55-4f40-bfbf-ac63c462cd99,Jonathan Juarez,+1-609-375-7527,BEL,9821,PT,Rejected,2022-08-16T12:22:58.504+0000,Personal Travel Insurance
52edaaf5-a752-44de-968a-07f7e92f3cab,Andrew Haynes,671-638-2120x04615,WLF,3645,CA,Rejected,2022-08-16T12:22:58.504+0000,Commercial Accident Insurance
1ff139db-5e8d-4781-9afc-15b2f570dcf3,Christopher Salazar,820-667-5948x6877,USA,4657,PH,Approved,2022-08-16T12:22:58.504+0000,Personal Home Insurance
9828a36d-b3e0-4593-9cee-0f1ad0259ea0,Heather Martinez,162.164.4873x46480,ARM,7986,CP,Rejected,2022-08-16T12:22:58.504+0000,Commercial Property Insurance


In [0]:
%sql
SELECT count(*) FROM claims_data;

count(1)
300


We start streaming computation by defining the sink as streaming query named "data_lake_query".  
`start()` function kicks off the streaming and continue to run as background jobs ...

In [0]:
save_loc = "/mnt/stream-data/claims"

datalake_write_query = claims_df.writeStream\
.format("csv")\
.outputMode("append")\
.queryName("data_lake_query")\
.trigger(processingTime='30 seconds')\
.option("checkpointLocation", f"{save_loc}/_checkpoint")\
.start(save_loc)

After completed, cancel (stop) previous jobs.

In [0]:
for s in spark.streams.active:
    s.stop()

In [0]:
#Unmount
dbutils.fs.unmount("/mnt/stream-data")

##### ==== end of notebook ====

In [0]:
%sh
cd /dbfs/FileStore/events/_checkpoints/event_stream
rm -rf *