In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json
from urllib.request import Request, urlopen
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import *

import statsmodels.api as sm
import numpy as np

import os.path
import sys
import os
from azure.storage.blob import AppendBlobService
from azure.storage import CloudStorageAccount

import adal

# Working parameters
streamingFrequency = 10
windowFrequency = 10  # Must be a multiplier of streamingFrequency
topic = 'timeseries'
variables = [[1, 'Temperature'], [2, 'Load'], [3, 'Voltage']]
modelPath = "linear_regression_model.pickle"
modelsContainer = "models"
filePrefix = "timeseries"
fileExtension = "csv"
data_container_name = 'timeseries'
datasetName = 'NTEDemoStreaming'
modelDate = None
model = None
predictionModel = None

In [2]:
# Assign Parameters
account_name = '{AZURE_STORAGE_ACCOUNT_NAME}'
account_key = '{AZURE_STORAGE_ACCOUNT_PASSWORD}'
tenantId = '{TENANT_ID}'
clientId = '{CLIENT_ID}'
username = '{USERNAME}'
password = '{PASSWORD}'
workspaceName = '{WORKSPACE_NAME}'
brokers = '{BROKER_1_IP_ADDRESS}:9092,{BROKER_2_IP_ADDRESS}:9092,{BROKER_3_IP_ADDRESS}:9092,{BROKER_4_IP_ADDRESS}:9092'

In [3]:
append_blob_service = AppendBlobService(account_name=account_name,
                                        account_key=account_key)
blob_account = CloudStorageAccount(account_name, account_key)
blob_service = blob_account.create_block_blob_service()

In [4]:
def convertToPandas(rdd):
    pdg = rdd.toPandas().groupby(['ObjectKey'])
    return pdg

def loadModel():
    import sys
    import pandas
    sys.modules['pandas.core.indexes'] = pandas.indexes
    global predictionModel
    global modelDate

    blob = blob_service.get_blob_properties(modelsContainer, modelPath)
    storageDate = blob.properties.last_modified
    if (modelDate is None) or (modelDate < storageDate):
        print('DOWNLOADING MODEL')
        # download from storage
        blob_service.get_blob_to_path(modelsContainer, modelPath, modelPath)
        predictionModel = sm.load(modelPath)
        modelDate = storageDate
    return predictionModel


def predict(data, dsmatrix):
    import pandas 
    import sys
    import random
    sys.modules['pandas.core.indexes'] = pandas.indexes
    model = loadModel()
    predictedVariable = variables[0][1]
    prediction = model.predict(dsmatrix, transform=False)
    predictionError = data[predictedVariable][0] - prediction[0]
    if predictionError < 0:
      predictionError *= -1
    pred = data.assign(prediction=prediction,
                       error=predictionError)
    pred = pred.assign(anomaly=0)
    isAnomaly = 0
    
    for ind in range(len(pred)):
        current_value = pred.loc[pred.index[ind], predictedVariable]
        # IN CASE WE HAVE HARD LIMITS FOR VARIABLE VALUES
        #if (current_value > UPPER_LIMIT) or (current_value < LOWER_LIMIT):
        #    pred.loc[pred.index[ind],'anomaly'] = True
        #    continue
        #elif ind > 1:
        sigma = random.uniform(0.2, 0.8)
        prediction = pred.loc[pred.index[ind], 'prediction']
        prediction_upper = prediction + 5*sigma
        prediction_lower = prediction - 5*sigma

        if ((current_value > prediction_upper) or (current_value < prediction_lower)):
           isAnomaly = 1
    return isAnomaly


def save(data, deviceId, filePrefix, extension):
    date = data.ValueTime[0][:10]
    fileName = filePrefix + '-' + date + '.' + extension
    path = os.path.join(str(deviceId), fileName)
    dataAsText = data.to_string(header=False, index=False)

    print('Saving ' + path + ' in container ' + data_container_name)
    append_blob_service.create_container(data_container_name)
    if not append_blob_service.exists(data_container_name, path):
        append_blob_service.create_blob(data_container_name, path)
    append_blob_service.append_blob_from_text(data_container_name,
                                              path, dataAsText + '\n')


def getPowerBIToken(username, password, tenantId, clientId):
    tokenResource = 'https://analysis.windows.net/powerbi/api'
    tokenEndpoint = 'https://login.windows.net/' + tenantId
    context = adal.AuthenticationContext(tokenEndpoint)
    tokenResponse = context.acquire_token_with_username_password(
        tokenResource,
        username,
        password,
        clientId)
    token = 'Bearer ' + tokenResponse['accessToken']
    return token


def callPowerBIApi(access_token, endpoint):
    addRowsRequest = Request(endpoint)
    addRowsRequest.add_header('Authorization', access_token)
    addRowsRequest.add_header('Content-Type', 'application/json')
    responsePowerBI = urlopen(addRowsRequest).read().decode('utf-8')
    return responsePowerBI


def getDataSets(access_token, workspaceId):
    endpoint = ('https://api.powerbi.com/v1.0/myorg/groups/'
                + workspaceId
                + '/datasets') 
    return callPowerBIApi(access_token, endpoint)

def getDataSets(access_token):
    endpoint = ('https://api.powerbi.com/v1.0/myorg/datasets') 
    return callPowerBIApi(access_token, endpoint)
  
def getDataSetsByWorkspace(access_token, workspaceId):
    endpoint = ('https://api.powerbi.com/v1.0/myorg/groups/'
                + workspaceId
                + '/datasets') 
    return callPowerBIApi(access_token, endpoint)

def getWorkspaces(access_token):
    endpoint = 'https://api.powerbi.com/v1.0/myorg/groups'
    return callPowerBIApi(access_token, endpoint)


def findByName(itemlist, name):
    return [item for item in itemlist if item['name'] == name]


def sendToPowerBIByWorkspace(data, access_token, workspaceId, datasetId):
    rows = data.to_json(orient='records')
    powerBiDatasetEndpoint = ('https://api.powerbi.com/v1.0/myorg/groups/'
                              + workspaceId + '/datasets/'
                              + datasetId + '/tables/RealTimeData/rows')
    addRowsBody = "{ 'rows' : " + rows + " }"
    addRowsRequest = Request(powerBiDatasetEndpoint,
                             addRowsBody.encode("utf-8"))
    addRowsRequest.add_header('Authorization', access_token)
    addRowsRequest.add_header('Content-Type', 'application/json')
    urlopen(addRowsRequest).read()


def sendToPowerBI(data, access_token, datasetId):
    rows = data.to_json(orient='records')
    powerBiDatasetEndpoint = ('https://api.powerbi.com/v1.0/myorg/datasets/'
                              + datasetId + '/tables/RealTimeData/rows')
    addRowsBody = "{ 'rows' : " + rows + " }"
    addRowsRequest = Request(powerBiDatasetEndpoint,
                             addRowsBody.encode("utf-8"))
    addRowsRequest.add_header('Authorization', access_token)
    addRowsRequest.add_header('Content-Type', 'application/json')
    urlopen(addRowsRequest).read()


def process(ValueTime, Temperature, Load, Voltage, DeviceId):
    import patsy
    predictionData = pd.DataFrame({"ValueTime":ValueTime,"Temperature":Temperature,"Load":Load,"Voltage":Voltage,"DeviceId":DeviceId}, index=[0])
    x = patsy.dmatrix("Load * Voltage", data=predictionData)
    
    pred = predict(predictionData, x)
    predictedData = predictionData.assign(anomaly=pred)
    
    if workspaceId:
      sendToPowerBIByWorkspace(predictedData, powerBIToken, workspaceId, datasetId)
    else:
      sendToPowerBI(predictedData, powerBIToken, datasetId)
    
    return pred

In [5]:
powerBIToken = getPowerBIToken(username, password, tenantId, clientId)

workspaceId = ''
dataset = None
if workspaceName:
    workspaces = json.loads(getWorkspaces(powerBIToken))['value']
    workspace = findByName(workspaces, workspaceName)

    if not workspace:
        print('Cannot find workspace ' + workspaceName)
        sys.exit()

    workspaceId = workspace[0]['id']
    print("Using PowerBI workspace " + workspaceId)

    datasets = json.loads(getDataSetsByWorkspace(powerBIToken, workspaceId))['value']
    dataset = findByName(datasets, datasetName)
else:
    datasets = json.loads(getDataSets(powerBIToken))['value']
    dataset = findByName(datasets, datasetName)
    
if not dataset:
    print('Cannot find dataset ' + datasetName)
    sys.exit()

datasetId = dataset[0]['id']

In [6]:
spark = SparkSession(sc)
ssc = StreamingContext(sc, streamingFrequency)

procAnomaly = udf(process, IntegerType())
schema = StructType().add("ValueTime", StringType()).add("Temperature", FloatType()).add("Load", FloatType()).add("Voltage", FloatType()).add("DeviceId", StringType())

df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", "timeseriesspearfish") \
.option("startingOffsets", "latest") \
.load()

selectDF = df\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
.select("parsed_value.*")\
.select(procAnomaly("ValueTime", "Temperature", "Load", "Voltage", "DeviceId").alias("IsAnomaly"), "*")

query = selectDF \
.writeStream \
.format("memory")\
.queryName("streamingOutput")\
.start()

query.awaitTermination()