# Structured Streaming with Azure EventHubs 

## Datasets Used
This notebook will consumn data being published through an EventHub with the following schema:

- `Index`
- `Arrival_Time`
- `Creation_Time`
- `x`
- `y`
- `z`
- `User`
- `Model`
- `Device`
- `gt`
- `id`
- `geolocation`

## Library Requirements

1. the Maven library with coordinate `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18`
   - this allows Databricks `spark` session to communicate with an Event Hub
2. the Python library `azure-eventhub`
   - this is allows the Python kernel to stream content to an Event Hub

The next cell walks you through installing the Maven library. A couple cells below that, we automatically install the Python library using `%pip install`.

## Lab Setup

If you are running in an Azure Databricks environment that is already pre-configured with the libraries you need, you can skip to the next cell. To use this notebook in your own Databricks environment, you will need to create libraries, using the [Create Library](https://docs.azuredatabricks.net/user-guide/libraries.html) interface in Azure Databricks. Follow the steps below to attach the `azure-eventhubs-spark` library to your cluster:

1. In the left-hand navigation menu of your Databricks workspace, select **Compute**, then select your cluster in the list. If it's not running, start it now.

  ![Select cluster](https://databricksdemostore.blob.core.windows.net/images/10-de-learning-path/select-cluster.png)

2. Select the **Libraries** tab (1), then select **Install New** (2). In the Install Library dialog, select **Maven** under Library Source (3). Under Coordinates, paste **com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18** (4), then select **Install**.
  
  ![Databricks new Maven library](https://databricksdemostore.blob.core.windows.net/images/10-de-learning-path/install-eventhubs-spark-library.png)

3. Wait until the library successfully installs before continuing.

  ![Library installed](https://databricksdemostore.blob.core.windows.net/images/10-de-learning-path/eventhubs-spark-library-installed.png)

Once complete, return to this notebook to continue with the lesson.

### Getting Started

Run the following two cells to install the `azure-eventhub` Python library and configure our "classroom."

In [0]:
# This library allows the Python kernel to stream content to an Event Hub:
%pip install azure-eventhub

Python interpreter will be restarted.
Collecting azure-eventhub
  Downloading azure_eventhub-5.10.1-py3-none-any.whl (150 kB)
Collecting uamqp<2.0.0,>=1.6.0
  Downloading uamqp-1.6.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
Collecting typing-extensions>=4.0.1
  Downloading typing_extensions-4.4.0-py3-none-any.whl (26 kB)
Installing collected packages: uamqp, typing-extensions, azure-eventhub
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 3.10.0.2
    Not uninstalling typing-extensions at /databricks/python3/lib/python3.9/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-7404b9fb-b7a0-49b1-85b2-cea0aa638d79
    Can't uninstall 'typing-extensions'. No files were found to uninstall.
Successfully installed azure-eventhub-5.10.1 typing-extensions-4.4.0 uamqp-1.6.3
Python interpreter will be restarted.


In [0]:
%run "./Includes/Classroom-Setup"

The following cell sets up a local streaming file read that we'll be writing to Event Hubs.

In [0]:
%run ./Includes/Streaming-Demo-Setup

In order to reach Event Hubs, you will need to insert the connection string-primary key you acquired at the end of the Getting Started notebook in this module. You acquired this from the Azure Portal, and copied it into Notepad.exe or another text editor.

> Read this article to learn [how to acquire the connection string for an Event Hub](https://docs.microsoft.com/azure/event-hubs/event-hubs-create) in your own Azure Subscription.

In [0]:
event_hub_connection_string = "Endpoint=sb://cg-adb-lab-eventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=2BVyKL6hL3dSZXWHlAvV5I+r1qAYyt2V1gG65Nbl/+I=;EntityPath=cg-adb-eh" # Paste your Event Hubs connection string in the quotes to the left

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Azure Event Hubs</h2>

Microsoft Azure Event Hubs is a fully managed, real-time data ingestion service.
You can stream millions of events per second from any source to build dynamic data pipelines and immediately respond to business challenges.
It integrates seamlessly with a host of other Azure services.

Event Hubs can be used in a variety of applications such as
* Anomaly detection (fraud/outliers)
* Application logging
* Analytics pipelines, such as clickstreams
* Archiving data
* Transaction processing
* User telemetry processing
* Device telemetry streaming
* <b>Live dashboarding</b>

In [0]:
%python

event_hub_name = "cg-adb-eh"
connection_string = event_hub_connection_string + ";EntityPath=" + event_hub_name

print("Consumer Connection String: {}".format(connection_string))

Consumer Connection String: Endpoint=sb://cg-adb-lab-eventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=2BVyKL6hL3dSZXWHlAvV5I+r1qAYyt2V1gG65Nbl/+I=;EntityPath=cg-adb-eh;EntityPath=cg-adb-eh


### Write Stream to Event Hub to Produce Stream

In [0]:
%python

# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehWriteConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_string)
}

checkpointPath = 'dbfs:' + '/event-hub/write-checkpoint'
dbutils.fs.rm(checkpointPath,True)

(activityStreamDF
  .writeStream
  .format("eventhubs")
  .options(**ehWriteConf)
  .option("checkpointLocation", checkpointPath)
  .start())

Out[22]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fb0c90bda90>

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Event Hubs Configuration</h2>

Assemble the following:
* A `startingEventPosition` as a JSON string
* An `EventHubsConf`
  * to include a string with connection credentials
  * to set a starting position for the stream read
  * to throttle Event Hubs' processing of the streams

In [0]:
%python

import json

# Create the starting position Dictionary
startingEventPosition = {
  "offset": "-1",
  "seqNo": -1,            # not in use
  "enqueuedTime": None,   # not in use
  "isInclusive": True
}

eventHubsConf = {
  "eventhubs.connectionString" : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_string),
  "eventhubs.startingPosition" : json.dumps(startingEventPosition),
  "setMaxEventsPerTrigger": 100
}

### READ Stream using Event Hubs

The `readStream` method is a <b>transformation</b> that outputs a DataFrame with specific schema specified by `.schema()`.

In [0]:
%python

from pyspark.sql.functions import col

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

eventStreamDF = (spark.readStream
  .format("eventhubs")
  .options(**eventHubsConf)
  .load()
)

eventStreamDF.printSchema()

root
 |-- body: binary (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- sequenceNumber: long (nullable = true)
 |-- enqueuedTime: timestamp (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- systemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



Most of the fields in this response are metadata describing the state of the Event Hubs stream. We are specifically interested in the `body` field, which contains our JSON payload.

Noting that it's encoded as binary, as we select it, we'll cast it to a string.

In [0]:
%python
bodyDF = eventStreamDF.select(col("body").cast("STRING"))

Each line of the streaming data becomes a row in the DataFrame once an <b>action</b> such as `writeStream` is invoked.

Notice that nothing happens until you engage an action, i.e. a `display()` or `writeStream`.

In [0]:
%python
display(bodyDF, streamName= "bodyDF")

body
"{""Arrival_Time"":1424688258127,""Creation_Time"":1424688256135782349,""Device"":""nexus4_1"",""Index"":249068,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162268,""x"":0.57418823,""y"":-0.8125305,""z"":0.1304779}"
"{""Arrival_Time"":1424688257925,""Creation_Time"":1424688255934549439,""Device"":""nexus4_1"",""Index"":249028,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162267,""x"":0.76538086,""y"":-0.07980347,""z"":0.34303284}"
"{""Arrival_Time"":1424688257525,""Creation_Time"":1424690103574161999,""Device"":""nexus4_2"",""Index"":250132,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162265,""x"":-0.18148804,""y"":-0.06665039,""z"":-0.1030426}"
"{""Arrival_Time"":1424688257119,""Creation_Time"":1424688255128439711,""Device"":""nexus4_1"",""Index"":248868,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162263,""x"":-0.94573975,""y"":0.2053833,""z"":0.13368225}"
"{""Arrival_Time"":1424688256717,""Creation_Time"":1424688254725760267,""Device"":""nexus4_1"",""Index"":248788,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162261,""x"":0.9309387,""y"":-0.705719,""z"":0.28642273}"
"{""Arrival_Time"":1424688256315,""Creation_Time"":1424688254322781746,""Device"":""nexus4_1"",""Index"":248708,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162259,""x"":0.048675537,""y"":0.27160645,""z"":-0.044692993}"
"{""Arrival_Time"":1424688255913,""Creation_Time"":1424688253919949715,""Device"":""nexus4_1"",""Index"":248628,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162257,""x"":-0.83251953,""y"":-0.04989624,""z"":-0.56059265}"
"{""Arrival_Time"":1424688251279,""Creation_Time"":1424688249286727455,""Device"":""nexus4_1"",""Index"":247708,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162234,""x"":0.06362915,""y"":0.15090942,""z"":-0.019058228}"
"{""Arrival_Time"":1424688250876,""Creation_Time"":1424688248884109047,""Device"":""nexus4_1"",""Index"":247628,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162232,""x"":0.8155823,""y"":0.40405273,""z"":0.026870728}"
"{""Arrival_Time"":1424688251683,""Creation_Time"":1424688249689803627,""Device"":""nexus4_1"",""Index"":247788,""Model"":""nexus4"",""User"":""g"",""geolocation"":{""city"":""Manila"",""country"":""Philippines""},""gt"":""stairsup"",""id"":162236,""x"":-0.874176,""y"":0.31539917,""z"":0.353714}"


While we can see our JSON data now that it's cast to string type, we can't directly manipulate it.

Before proceeding, stop this stream. We'll continue building up transformations against this streaming DataFrame, and a new action will trigger an additional stream.

In [0]:
%python
for s in spark.streams.active:
  if s.name == "bodyDF":
    s.stop()

## <img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Parse the JSON payload

The Event Hub acts as a sort of "firehose" (or asynchronous buffer) and displays raw data in the JSON format.

If desired, we could save this as raw bytes or strings and parse these records further downstream in our processing.

Here, we'll directly parse our data so we can interact with the fields.

The first step is to define the schema for the JSON payload.

> Both time fields are encoded as `LongType` here because of non-standard formatting.

In [0]:
%python

from pyspark.sql.types import StructField, StructType, StringType, LongType, DoubleType

schema = StructType([
  StructField("Arrival_Time", LongType(), True),
  StructField("Creation_Time", LongType(), True),
  StructField("Device", StringType(), True),
  StructField("Index", LongType(), True),
  StructField("Model", StringType(), True),
  StructField("User", StringType(), True),
  StructField("gt", StringType(), True),
  StructField("x", DoubleType(), True),
  StructField("y", DoubleType(), True),
  StructField("z", DoubleType(), True),
  StructField("geolocation", StructType([
    StructField("PostalCode", StringType(), True),
    StructField("StateProvince", StringType(), True),
    StructField("city", StringType(), True),
    StructField("country", StringType(), True)
  ]), True),
  StructField("id", StringType(), True)
])

### Parse the data

Next we can use the function `from_json` to parse out the full message with the schema specified above.

When parsing a value from JSON, we end up with a single column containing a complex object.

In [0]:
%python

from pyspark.sql.functions import col, from_json

parsedEventsDF = bodyDF.select(
  from_json(col("body"), schema).alias("json"))

parsedEventsDF.printSchema()

root
 |-- json: struct (nullable = true)
 |    |-- Arrival_Time: long (nullable = true)
 |    |-- Creation_Time: long (nullable = true)
 |    |-- Device: string (nullable = true)
 |    |-- Index: long (nullable = true)
 |    |-- Model: string (nullable = true)
 |    |-- User: string (nullable = true)
 |    |-- gt: string (nullable = true)
 |    |-- x: double (nullable = true)
 |    |-- y: double (nullable = true)
 |    |-- z: double (nullable = true)
 |    |-- geolocation: struct (nullable = true)
 |    |    |-- PostalCode: string (nullable = true)
 |    |    |-- StateProvince: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |-- id: string (nullable = true)



Note that we can further parse this to flatten the schema entirely and properly cast our time fields.

In [0]:
%python

from pyspark.sql.functions import from_unixtime

flatSchemaDF = (parsedEventsDF
  .select(from_unixtime(col("json.Arrival_Time")/1000).alias("Arrival_Time").cast("timestamp"),
          (col("json.Creation_Time")/1E9).alias("Creation_Time").cast("timestamp"),
          col("json.Device").alias("Device"),
          col("json.Index").alias("Index"),
          col("json.Model").alias("Model"),
          col("json.User").alias("User"),
          col("json.gt").alias("gt"),
          col("json.x").alias("x"),
          col("json.y").alias("y"),
          col("json.z").alias("z"),
          col("json.id").alias("id"),
          col("json.geolocation.country").alias("country"),
          col("json.geolocation.city").alias("city"),
          col("json.geolocation.PostalCode").alias("PostalCode"),
          col("json.geolocation.StateProvince").alias("StateProvince"))
)

This flat schema provides us the ability to view each nested field as a column.

In [0]:
%python
display(flatSchemaDF)

Arrival_Time,Creation_Time,Device,Index,Model,User,gt,x,y,z,id,country,city,PostalCode,StateProvince
2015-02-23T10:44:18.000+0000,2015-02-23T10:44:16.135+0000,nexus4_1,249068,nexus4,g,stairsup,0.57418823,-0.8125305,0.1304779,162268,Philippines,Manila,,
2015-02-23T10:44:17.000+0000,2015-02-23T10:44:15.934+0000,nexus4_1,249028,nexus4,g,stairsup,0.76538086,-0.07980347,0.34303284,162267,Philippines,Manila,,
2015-02-23T10:44:17.000+0000,2015-02-23T11:15:03.574+0000,nexus4_2,250132,nexus4,g,stairsup,-0.18148804,-0.06665039,-0.1030426,162265,Philippines,Manila,,
2015-02-23T10:44:17.000+0000,2015-02-23T10:44:15.128+0000,nexus4_1,248868,nexus4,g,stairsup,-0.94573975,0.2053833,0.13368225,162263,Philippines,Manila,,
2015-02-23T10:44:16.000+0000,2015-02-23T10:44:14.725+0000,nexus4_1,248788,nexus4,g,stairsup,0.9309387,-0.705719,0.28642273,162261,Philippines,Manila,,
2015-02-23T10:44:16.000+0000,2015-02-23T10:44:14.322+0000,nexus4_1,248708,nexus4,g,stairsup,0.048675537,0.27160645,-0.044692993,162259,Philippines,Manila,,
2015-02-23T10:44:15.000+0000,2015-02-23T10:44:13.919+0000,nexus4_1,248628,nexus4,g,stairsup,-0.83251953,-0.04989624,-0.56059265,162257,Philippines,Manila,,
2015-02-23T10:44:11.000+0000,2015-02-23T10:44:09.286+0000,nexus4_1,247708,nexus4,g,stairsup,0.06362915,0.15090942,-0.019058228,162234,Philippines,Manila,,
2015-02-23T10:44:10.000+0000,2015-02-23T10:44:08.884+0000,nexus4_1,247628,nexus4,g,stairsup,0.8155823,0.40405273,0.026870728,162232,Philippines,Manila,,
2015-02-23T10:44:11.000+0000,2015-02-23T10:44:09.689+0000,nexus4_1,247788,nexus4,g,stairsup,-0.874176,0.31539917,0.353714,162236,Philippines,Manila,,


### Stop all active streams

In [0]:
%python
for s in spark.streams.active:
  s.stop()

#### Event Hubs FAQ

This [FAQ](https://github.com/Azure/azure-event-hubs-spark/blob/master/FAQ.md) can be an invaluable reference for occasional Spark-EventHub debugging.