### Captured EventHub Example Databricks Notebook
##### by Robert Alexander, roalexan@microsoft.com

##### Copyright (c) Microsoft Corporation. All rights reserved.

##### Licensed under the MIT License.

##### Prerequisites
1. An **Azure subscription**. You will be asked for the *subscription id*.
1. A **Service Principal** with read/write access to this subscription. You will be asked for the *app id*, *app key*, and *tenant id*. Click [here](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal) for help on adding a Service Principal.
1. An **Azure DataBricks Service** and **cluster**. Use Python version 2 for the cluster. Click [here](https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal) for help on adding a DataBricks Service and cluster.
1. Add the following libraries to your cluster via pypi. Click [here](https://docs.databricks.com/user-guide/libraries.html) for help on adding a library.
   - **azure-cli**
   - **azure-eventhub** 

##### Usage
Enter the the required input parameters then click run all (or run each step invidually, if you prefer). This will create a resource group containing a [Captured EventHub](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-overview) configured to automatically write messages sent to the EventHub to an Azure Storage Account container.

##### Cleanup

When you are finished, you can undeploy all the resources created by this notebook by uncommenting and running the last step.

##### The following Azure services will be deployed into a new resource group:
1. Azure Storage Account
1. Event Hubs Namespace

In [2]:
dbutils.widgets.text("subscription_id", "", "")
dbutils.widgets.text("location", "", "")
dbutils.widgets.text("prefix", "", "")
dbutils.widgets.text("tenant_id", "","")
dbutils.widgets.text("app_id", "","")
dbutils.widgets.text("app_key", "","")

# After running this cell, fill in all of the above input parameters before proceeding.

In [3]:
SUBSCRIPTION_ID = dbutils.widgets.get("subscription_id")
PREFIX = dbutils.widgets.get("prefix")
RESOURCE_GROUP_NAME = PREFIX + "EventHub-rg"
LOCATION = dbutils.widgets.get("location")
TENANT_ID = dbutils.widgets.get("tenant_id")
APP_ID = dbutils.widgets.get("app_id")
APP_KEY = dbutils.widgets.get("app_key")

STORAGE_ACCOUNT_NAME = PREFIX + "storageaccount"
STORAGE_CONTAINER_NAME = "container2"
NAMESPACE_NAME = PREFIX + "EventHubNamespace"
EVENT_HUB_NAME = PREFIX + "EventHub"
MOUNT_POINT = "/mnt/" + STORAGE_CONTAINER_NAME

print('SUBSCRIPTION_ID: ', SUBSCRIPTION_ID)
print('PREFIX: ', PREFIX)
print('RESOURCE_GROUP_NAME: ', RESOURCE_GROUP_NAME)
print('LOCATION: ', LOCATION)
print('TENANT_ID: ', TENANT_ID)
print('APP_ID: ', APP_ID)
print('APP_KEY: ', APP_KEY)

print('STORAGE_CONTAINER_NAME: ', STORAGE_CONTAINER_NAME)
print('STORAGE_ACCOUNT_NAME: ', STORAGE_ACCOUNT_NAME)
print('NAMESPACE_NAME: ', NAMESPACE_NAME)
print('EVENT_HUB_NAME: ', EVENT_HUB_NAME)

In [4]:
# https://docs.microsoft.com/en-us/python/azure/python-sdk-azure-authenticate?view=azure-python

from azure.common.credentials import ServicePrincipalCredentials

credentials = ServicePrincipalCredentials(
    client_id = APP_ID,
    secret = APP_KEY,
    tenant = TENANT_ID
)
print('credentials: ', credentials)

In [5]:
# https://github.com/Azure/azure-sdk-for-python/tree/master/azure-mgmt-resource

from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup
from azure.common.client_factory import get_client_from_cli_profile

resourceManagementClient = get_client_from_cli_profile(ResourceManagementClient, credentials=credentials, subscription_id=SUBSCRIPTION_ID)
resourceManagementClient.resource_groups.create_or_update(
    resource_group_name = RESOURCE_GROUP_NAME,
    parameters = ResourceGroup(location=LOCATION)
)
print('ResourceGroup created: ', RESOURCE_GROUP_NAME)

In [6]:
# https://github.com/Azure-Samples/storage-python-manage
# https://github.com/Azure/azure-sdk-for-python/tree/master/azure-mgmt-storage
# https://docs.microsoft.com/en-us/python/api/overview/azure/storage/management?view=azure-python
# https://blogs.msdn.microsoft.com/jmstall/2014/06/12/azure-storage-naming-rules/

from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.storage.storage_management_client import StorageManagementClient
from azure.mgmt.storage.models import StorageAccountCreateParameters
from azure.mgmt.storage.models import Sku
from azure.mgmt.storage.models import SkuName
from azure.mgmt.storage.models import Kind

# Create StorageManagementClient
storageManagementClient = get_client_from_cli_profile(StorageManagementClient, credentials=credentials, subscription_id=SUBSCRIPTION_ID)
print('storageManagementClient: ', storageManagementClient)

# Create StorageAccount
async_create = storageManagementClient.storage_accounts.create(
    resource_group_name = RESOURCE_GROUP_NAME,
    account_name = STORAGE_ACCOUNT_NAME,
    parameters = StorageAccountCreateParameters(
        sku = Sku(name=SkuName.standard_lrs),
        kind = Kind.storage_v2,
        location = LOCATION
    )
)
async_create.wait()
print('StorageAccount created: ', STORAGE_ACCOUNT_NAME)

In [7]:
#https://github.com/Azure/azure-sdk-for-python/blob/master/azure-mgmt-eventhub/azure/mgmt/eventhub/operations/event_hubs_operations.py
#https://github.com/Azure/azure-sdk-for-python/tree/master/azure-mgmt-eventhub/tests
#https://docs.microsoft.com/en-us/python/api/azure-mgmt-eventhub/?view=azure-python
#https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-overview

from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.eventhub.models import EHNamespace
from azure.mgmt.eventhub.models import Eventhub
from azure.mgmt.eventhub.models import CaptureDescription
from azure.mgmt.eventhub.models import Destination
from azure.mgmt.eventhub.models import EncodingCaptureDescription

# Create EventHubManagementClient
eventHubManagementClient = get_client_from_cli_profile(EventHubManagementClient, credentials=credentials, subscription_id=SUBSCRIPTION_ID)
print('eventHubManagementClient: ', eventHubManagementClient)

# Create EventHub NameSpace
async_create = eventHubManagementClient.namespaces.create_or_update(
    resource_group_name = RESOURCE_GROUP_NAME,
    namespace_name = NAMESPACE_NAME,
    parameters = EHNamespace(location=LOCATION)
)
async_create.wait()
print('NameSpace created: ', NAMESPACE_NAME)

# Create (Captured) EventHub
storage_account = storageManagementClient.storage_accounts.get_properties(RESOURCE_GROUP_NAME, STORAGE_ACCOUNT_NAME)
eventHubManagementClient.event_hubs.create_or_update(
    resource_group_name = RESOURCE_GROUP_NAME,
    namespace_name = NAMESPACE_NAME,
    event_hub_name = EVENT_HUB_NAME,
    parameters = Eventhub(
        message_retention_in_days = 2,
        partition_count = 2,
        capture_description = CaptureDescription(
            enabled=True,
            encoding=EncodingCaptureDescription.avro,
            interval_in_seconds = 60,
            size_limit_in_bytes = 1024*1024*10, # must be >= 10 MB
            destination = Destination(
                name="EventHubArchive.AzureBlockBlob",
                storage_account_resource_id = storage_account.id,
                blob_container = STORAGE_CONTAINER_NAME,
                archive_name_format="{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
            )
        )
    )
)
print('EventHub created: ', EVENT_HUB_NAME)

In [8]:
# https://github.com/Azure/azure-event-hubs-python
# https://github.com/Azure/azure-event-hubs-python/blob/master/tests/test_send.py - see json.dumps

from azure.eventhub import EventHubClient, Sender, EventData
import time
import random
import json

authorization_rules = list(eventHubManagementClient.namespaces.list_authorization_rules(RESOURCE_GROUP_NAME, NAMESPACE_NAME))
default_authorization_rule_name = authorization_rules[0].name

accessKeys = eventHubManagementClient.namespaces.list_keys(RESOURCE_GROUP_NAME, NAMESPACE_NAME, default_authorization_rule_name)

ADDRESS = "amqps://{0}.servicebus.windows.net/{1}".format(NAMESPACE_NAME, EVENT_HUB_NAME)
USER = accessKeys.key_name
KEY = accessKeys.primary_key

# Create Event Hubs client
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="0")
client.run()
try:
    start_time = time.time()
    for i in range(10):
        print("Sending message: {}".format(i))
        userId = str(random.randint(1,1000))
        movieId = str(random.randint(1,100))
        rating = str(random.randint(1,5))
        message = json.dumps({'userId': userId, 'movieId': movieId, 'rating': rating})
        sender.send(EventData(message)) 
except:
    raise
finally:
    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Runtime: {} seconds".format(run_time))

In [9]:
accountKey = "fs.azure.account.key.{}.blob.core.windows.net".format(STORAGE_ACCOUNT_NAME)
accessKey = storageManagementClient.storage_accounts.list_keys(resource_group_name=RESOURCE_GROUP_NAME, account_name=STORAGE_ACCOUNT_NAME).keys[0].value

# Mount the drive for native python
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(STORAGE_CONTAINER_NAME, STORAGE_ACCOUNT_NAME)
extraConfig = {accountKey: accessKey}
print("Mounting: {}".format(MOUNT_POINT))
try:
  dbutils.fs.mount(
    source = inputSource,
    mount_point = MOUNT_POINT,
    extra_configs = extraConfig
  )
  print("Succeeded")
except Exception as e:
  if "Directory already mounted" in str(e):
    print("Directory {} already mounted".format(MOUNT_POINT))
  else:
    raise(e)

In [10]:
# https://docs.azuredatabricks.net/spark/latest/data-sources/read-avro.html
# https://blog.itaysk.com/2017/01/14/processing-event-hub-capture-files-using-spark

# Read all files from PartitionId 0.
file_location = "dbfs:{0}/{1}/{2}/0/*/*/*/*/*/*".format(MOUNT_POINT, NAMESPACE_NAME.lower(), EVENT_HUB_NAME.lower())
file_type = "com.databricks.spark.avro"
reader = spark.read.format(file_type).option("inferSchema", "true")
raw = reader.load(file_location)

# One option for decoding body
jsonRdd = raw.select(raw.Body.cast("string")).rdd.map(lambda x: x[0])

# Another option for decoding body
# https://stackoverflow.com/questions/53197825/pyspark-deserializing-an-avro-serialized-message-contained-in-an-eventhub-captu
#from pyspark.sql import functions as f
#decodeElements = f.udf(lambda a: a.decode('utf-8'))
#jsonRdd = raw.select(decodeElements(raw['Body'])).rdd.map(lambda x: x[0])

jsonData = spark.read.json(jsonRdd)
jsonFilteredData = jsonData[jsonData['rating'] > 3]
display(jsonFilteredData)

movieId,rating,userId
100,5,39
56,4,112


In [11]:
# When you are finished, you can undeploy all the resources created by this notebook by uncommenting and running this step. This will delete the resource group and all of its resources - namely the Azure Storage Account and Event Hubs Namespace.

#dbutils.fs.unmount(MOUNT_POINT)
#async_delete = resourceManagementClient.resource_groups.delete(resource_group_name = RESOURCE_GROUP_NAME)
#async_delete.wait()