In [0]:
%run ./Mount

In [0]:
%run ./Presidio

In [0]:
import os

import json

import datetime

import requests
from requests.auth import HTTPDigestAuth

from pandas.io.json import json_normalize

from pyspark.sql.types import *
import pyspark.sql.functions as F

In [0]:
# configure ADX Access
ADXappId = dbutils.secrets.get(scope = "ADXappIP", key = "ADXappIP") 
ADXappSecret = dbutils.secrets.get(scope = "ADXappSecret", key = "ADXappSecret") 
ADXtenantId = dbutils.secrets.get(scope = "tenantId", key = "tenantId") 

In [0]:
# mount the container
mySourceContainerName = "databricks"
myStorageAccountName = "adlstechoramathcosters"

mountPointBatch = exec_mount_container( mySourceContainerName, myStorageAccountName )

dbutils.fs.mounts()

Mounting: /mnt/databricks
Mount point /mnt/databricks already exists
Out[41]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/mnt/databricks', source='abfss://databricks@adlstechoramathcosters.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='')]

In [0]:
ehsas = dbutils.secrets.get(scope = "ehsas", key = "ehsas") 
ehsaskey = dbutils.secrets.get(scope = "ehsaskey", key = "ehsaskey") 

connectionString = 'Endpoint=sb://techorama-apim.servicebus.windows.net/;SharedAccessKeyName='+ehsas+';SharedAccessKey='+ehsaskey+';EntityPath=testappeh'
consumerGroup = "databricks"

receiverTimeoutDuration = datetime.time(0,2,0).strftime("PT%HH%MM%SS") #120 seconds

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
ehConf['eventhubs.consumerGroup'] = consumerGroup
ehConf['eventhubs.receiverTimeout'] = receiverTimeoutDuration

eventSchema = StructType([
  StructField("timeStamp", StringType(), True),
  StructField("name", StringType(), True),
  StructField("metric", LongType(), True),
  StructField("source", StringType(), True)]
)


ehEventsDF = (
  spark
  .readStream
  .format("eventhubs")
  .options(**ehConf)
  .option("startingPosition", "fromStartOfStream")
  .load()
)

# Is this DF actually a streaming DF?
ehEventsDF.isStreaming
display( ehEventsDF )

In [0]:
decoded_df = ehEventsDF.withColumn("Body", F.col("body").cast("string")).select("Body")
df_events = decoded_df.withColumn("EventData",F.from_json(F.col("Body"), eventSchema)).select("EventData.*")


display( df_events )

In [0]:
# Write out data to Azure Data Explorer
def exec_AllTaxiTripDataProcessing():

    kustoOptions = {"kustoCluster":"https://adxtechoramathcosters.westeurope.kusto.windows.net", "kustoDatabase" : "techorama", "kustoTable" : "TestTableSpark", "kustoAadAppId":ADXappId ,"kustoAadAppSecret":ADXappSecret, "kustoAadAuthorityID":ADXtenantId}

    spark.conf.set("spark.sql.streaming.checkpointLocation", "/mnt/databricks/checkpoints/TechoramaTst1KustoEvents")

    df_kusto_events = df_events.withColumn("anonymized_name", anonymize_series(df_events["name"])) \
        .select("timeStamp", "anonymized_name", "metric", "source") \
        .withColumnRenamed("anonymized_name","name")
            
   
   # https://github.com/Azure/azure-kusto-spark/blob/master/docs/KustoSink.md
   
    kustoQ = (
        df_kusto_events.repartition(16).writeStream.format(
            "com.microsoft.kusto.spark.datasink.KustoSinkProvider"
        )
        .option("kustoCluster",kustoOptions["kustoCluster"]) \
        .option("kustoDatabase",kustoOptions["kustoDatabase"]) \
        .option("kustoTable", kustoOptions["kustoTable"]) \
        .option("kustoAadAppId",kustoOptions["kustoAadAppId"]) \
        .option("kustoAadAppSecret",kustoOptions["kustoAadAppSecret"]) \
        .option("kustoAadAuthorityID",kustoOptions["kustoAadAuthorityID"]) \
        .outputMode("Append") \
        .option("tableCreateOptions","CreateIfNotExist") \
        .option("adjustSchema", "GenerateDynamicCsvMapping") \
        .trigger(processingTime="10 seconds") \
        .start()
    )

    kustoQ.awaitTermination()
 

In [0]:
exec_AllTaxiTripDataProcessing()