In [0]:
# Source with default settings
connectionString = "Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=xxx;SharedAccessKey=xxx;EntityPath=xx"
ehConf = {
  'eventhubs.connectionString' : connectionString
}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
 
 
df = spark \
  .readStream \
  .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider") \
  .options(**ehConf) \
  .load()
 
df = df.withColumn("body", df["body"].cast("string"))

In [0]:
df.printSchema()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as F


schema = ( StructType()
  .add('messageId', StringType()) 
  .add('temperature', DoubleType())
  .add('humidity', DoubleType())     
)

    
df2 = df.select((df.enqueuedTime).alias("Enqueued_Time"),
                (df.systemProperties["iothub-connection-device-id"]).alias("Device_ID")
                ,(from_json(df.body.cast("string"), schema).alias("telemetry_json"))).select("Enqueued_Time","Device_ID", "telemetry_json.*")

In [0]:
df2.createOrReplaceTempView("device_telemetry_data")

## Azure Cognitive Service Anormaly Detection

In [0]:
#date_trunc('SECOND',Enqueued_Time) as timestamp
temperature = spark.sql( """ Select Enqueued_Time, 
concat(date_format(Enqueued_Time,"yyyy-MM-dd"),"T",date_format(date_trunc('MINUTE',Enqueued_Time), 'HH:mm:ssX')) as timestamp,
temperature as value  
from device_telemetry_data """)

In [0]:
display(temperature)

Enqueued_Time,timestamp,value
2021-11-18T06:28:01.075+0000,2021-11-18T06:28:00Z,29.414261227548376
2021-11-18T06:28:03.294+0000,2021-11-18T06:28:00Z,25.81875585262603
2021-11-18T06:28:05.279+0000,2021-11-18T06:28:00Z,29.82713513609807
2021-11-18T06:28:07.279+0000,2021-11-18T06:28:00Z,21.21449058876932


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as F

windowedCounts = temperature.groupBy(
    window(temperature.Enqueued_Time, "15 minutes", "10 minutes"),
    temperature.timestamp).agg(F.max("value").alias("value")).orderBy(F.asc("timestamp"))

In [0]:
display(windowedCounts)

In [0]:
import requests
import json
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql.functions import lit,unix_timestamp
from pyspark.sql import functions as f 
from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct
# Import library to display results
import matplotlib.pyplot as plt
%matplotlib inline 
from pyspark.sql.functions import lit,unix_timestamp
import time
import datetime
from dateutil import parser

#Anomaly Detection
apikey = 'xxx' 
endpoint_latest = 'https://xxxx.cognitiveservices.azure.com/anomalydetector/v1.0/timeseries/last/detect'
#Power BI API
endpoint ='https://api.powerbi.com/beta/xxx...'

def detect(endpoint, apikey, request_data):
  headers = {'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': apikey}
  response = requests.post(endpoint, data=json.dumps(request_data), headers=headers)
  if response.status_code == 200:
    return json.loads(response.content.decode("utf-8"))
  else:
    print(response.status_code)
    raise Exception(response.text)

        
def detect_anomaly(df):
  newdf=df[["timestamp", "value"]].dropDuplicates().sort(df.timestamp.asc())
  df2 = newdf.toJSON().map(lambda j: json.loads(j)).collect()
  single_sample_data = {}
  single_sample_data['series'] = df2
  single_sample_data['granularity'] = 'minutely'
  single_sample_data['maxAnomalyRatio'] = 0.25
  single_sample_data['sensitivity'] = 95
  single_point = detect(endpoint_latest, apikey, single_sample_data)
   
  result = {'expectedValues': [None]*len(df2), 'upperMargins': [None]*len(df2), 
              'lowerMargins': [None]*len(df2), 'isNegativeAnomaly': [False]*len(df2), 
              'isPositiveAnomaly':[False]*len(df2), 'isAnomaly': [False]*len(df2)}
  i=len(df2)  
  result['expectedValues'][i-1] = single_point['expectedValue']
  result['upperMargins'][i-1] = single_point['upperMargin']
  result['lowerMargins'][i-1] = single_point['lowerMargin']
  result['isNegativeAnomaly'][i-1] = single_point['isNegativeAnomaly']
  result['isPositiveAnomaly'][i-1] = single_point['isPositiveAnomaly']
  result['isAnomaly'][i-1] = single_point['isAnomaly']
  return result,single_sample_data

def sendToBi (data):
  data_str = data
  print(data_str)
  newHeaders = {'Content-type': 'application/json'}
  response = requests.post(endpoint,
                         data=data_str,
                         headers=newHeaders)
  return print("Status code: ", response.status_code)



## It spark dataframe
def convertdf (df):
  result = df.to_json(orient="records", date_format='iso', date_unit = 's')
  parsed = json.loads(result)
  return json.dumps(parsed) 


def callBI(result,single_sample_data):
  columns = {'expectedValues': result['expectedValues'], 'isAnomaly': result['isAnomaly'], 'isNegativeAnomaly': result['isNegativeAnomaly'],
              'isPositiveAnomaly': result['isPositiveAnomaly'], 'upperMargins': result['upperMargins'], 'lowerMargins': result['lowerMargins']
              , 'value': [x['value'] for x in single_sample_data['series']], 'timestamp': [parser.parse(x['timestamp']) for x in     single_sample_data['series']]}
  response = pd.DataFrame(data=columns)
  print(sendToBi(convertdf(response)))
  pass
  
def calldetector(df,epoch_id):
    df2=df.dropDuplicates(['timestamp'])
    if df2.count()>=12:
      result,single_sample_data= detect_anomaly(df2)
      callBI(result,single_sample_data)
    else:
      newdf=df2[["timestamp", "value"]]
      #df.withColumn("dt_truncated", date_trunc("second", col("dt")))
      df2 = newdf.toJSON().map(lambda j: json.loads(j)).collect()
      single_sample_data = {}
      single_sample_data['series'] = df2
      print(single_sample_data)
    pass 

In [0]:
(windowedCounts
  .writeStream
  .outputMode('complete')
  .foreachBatch(calldetector)
  .start().awaitTermination())