### ML scoring on Streaming Data 

#### Installation: This cluster needs to be installed with following libraries:
- Maven: org.apache.kafka:kafka-clients:3.3.1
- Pypi:
  - azure-ai-ml
  - mlflow
  - scikit-learn==1.0.2
  - azure-kusto-ingest==4.0.0
  - azure-kusto-data==4.0.0
  - torch==1.12.0

### Connect to Event Hubs

In [None]:
from pyspark.sql.functions import from_json, col,explode, split,get_json_object
from pyspark.sql.types import *
con_str = dbutils.secrets.get("scope01", "ehns001-con")
EH_SASL = f"org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='{con_str}';"
GROUP_ID = "$Default"


data_schema = StructType([
    StructField("id", StringType(), True),
    StructField("starttime", TimestampType(), True),
    StructField("endtime", TimestampType(), True),
    StructField("car_type", StringType(), True),
    StructField("location", StringType(), True),
   StructField("car_id", StringType(), True)
]
)
availability = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ehns001.servicebus.windows.net:9093") \
  .option("subscribe", "availability") \
  .option("kafka.sasl.mechanism","PLAIN") \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.jaas.config", EH_SASL ) \
  .option("kafka.request.timeout.ms", "60000") \
  .option("kafka.session.timeout.ms", "60000") \
  .option("kafka.group.id", GROUP_ID) \
  .option("failOnDataLoss", "false") \
  .option("minOffsetsPerTrigger", 200) \
  .load() \
  .select(from_json(col("value").cast("string"), data_schema).alias("value"), "partition") \
  .select("value.id","value.starttime","value.endtime","value.car_type","value.location","value.car_id","partition") \

availability.createOrReplaceTempView("availability")



In [None]:
%sql select to_json(struct(location,starttime,car_type, partition)) value, car_id key from availability

### Login to Azure ML workspace to download model

In [None]:
#import required libraries
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential,DeviceCodeCredential

#Enter details of your AzureML workspace
subscription_id = '840b5c5c-3f4a-459a-94fc-6bad2a969f9d'
resource_group = 'ml'
workspace = 'ws02ent'

#connect to the workspace
ml_client = MLClient(DeviceCodeCredential(tenant_id="0fbe7234-45ea-498b-b7e4-1a8b2d3be4d9"), subscription_id, resource_group, workspace)
ml_client.models.download("torch_autoencoder", version=3,download_path="/dbfs/models")

### Autoencoder Scoring and write output to ADX and anomaly to EventHub

##### The below uses your own account but you can also use a service principal instead

In [None]:
import mlflow
import torch as T
import pandas as pd
import numpy as np

from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import make_column_transformer
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.ingest import (
    QueuedIngestClient,
    IngestionProperties,
    KustoStreamingIngestClient,
    ManagedStreamingIngestClient

)
THRESHOLD = 20
TIME_STEPS = 20


database_name ="db01"
table_name = "anomaly_output"
client_id = dbutils.secrets.get("scope01", "app01-id")
client_secret = dbutils.secrets.get("scope01", "app01-sec")
authority_id = "0fbe7234-45ea-498b-b7e4-1a8b2d3be4d9"

schema = "anomaly integer, location string, car_type string, count integer,timestamp timestamp"

class Autoencoder(T.nn.Module):  # 65-32-8-32-65
  def __init__(self):
    super(Autoencoder, self).__init__()
    self.fc1 = T.nn.Conv2d(1,32,7)
    self.fc2 = T.nn.Conv2d(32,16,7)
    self.fc3 = T.nn.ConvTranspose2d(16,32,7)
    self.fc4 = T.nn.ConvTranspose2d(32,1,7)
    # self.fc5 = T.nn.ConvTranspose2d(32,1,7)

  def encode(self, x):  # 65-32-8
    z = T.tanh(self.fc1(x))
    z = T.tanh(self.fc2(z))  # latent in [-1,+1]
    return z  

  def decode(self, x):  # 8-32-65
    z = T.tanh(self.fc3(x))
    z = T.sigmoid(self.fc4(z))
    # z = T.sigmoid(self.fc5(z))  # [0.0, 1.0]
    return z
    
  def forward(self, x):
    z = self.encode(x) 
    z = self.decode(z) 
    return z  # in [0.0, 1.0]
# Called when the service is loaded
def create_sequences(values, time_steps=TIME_STEPS):      
    output = []  
    for i in range(len(values) - time_steps + 1):
        output.append(values[i : (i + time_steps)])
    return np.stack(output)
def pre_process(df, transformer):
       input_df = df[['location','car_type', 'count']]
       mean = 2.2031823072902032e-17
       std = 1.0
       input_df['count'] = (input_df['count']- mean)/std
       transformed_data= transformer.transform(input_df)
       try:
         transformed_input = create_sequences(transformed_data)
       except Exception as e:
         raise Exception("transformed_input shape", df.shape)
       transformed_input = np.expand_dims(transformed_input, 1)
       transformed_input = T.tensor(np.float32(transformed_input), dtype=T.float32).to("cpu")
       return transformed_input



def detect_anomaly_autoencoder(df):
  model = Autoencoder()
  model.load_state_dict(T.load("/dbfs/models/torch_autoencoder/autoencoder.json"))
  model.eval()
  #load transformer
  locations =['loc_0', 'loc_1', 'loc_10', 'loc_11', 'loc_12', 'loc_13', 'loc_14',
      'loc_15', 'loc_16', 'loc_17', 'loc_18', 'loc_19', 'loc_2', 'loc_3',
      'loc_4', 'loc_5', 'loc_6', 'loc_7', 'loc_8', 'loc_9']
  car_types =['comfort', 'green', 'x', 'xl','comfort', 'green', 'x', 'xl','comfort', 'green', 'x', 'xl','comfort', 'green', 'x', 'xl','comfort', 'green', 'x', 'xl']

  transformer = make_column_transformer(
      (OneHotEncoder(sparse=False), ['location', 'car_type']),
      remainder='passthrough')
  transformer.fit(pd.DataFrame({"location":locations, "car_type":car_types, "count":range(20)}))

  if df.shape[0] < TIME_STEPS:
    df["anomaly"] = 99
    return df

  transformed_data = pre_process(df,transformer)
  Y = model(transformed_data)  # should be same as X
  errs = T.sum((transformed_data-Y)*(transformed_data-Y), dim=[1,2,3]).detach().numpy().tolist()  #
  anomalies = [int(err>THRESHOLD) for err in errs]
  anomalous_data_indices = []
  for data_idx in range(TIME_STEPS - 1, len(transformed_data) - TIME_STEPS + 1):
      if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
          anomalous_data_indices.append(data_idx)
  anomalies =np.array([-1]*df.shape[0])
  anomalies[anomalous_data_indices] =1
  df['anomaly']=anomalies
  
  
  #Ingest data to ADX
  cluster = "https://adxc01.westus.kusto.windows.net"
  cluster_ingest_uri = "https://ingest-adxc01.westus.kusto.windows.net"
  kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(cluster_ingest_uri, client_id, client_secret, authority_id)
  queue_client = QueuedIngestClient(kcsb_ingest)
  ingestion_props = IngestionProperties(
  database=f"{database_name}",
  table=f"{table_name}",
  )
  queue_client.ingest_from_dataframe(df, ingestion_properties=ingestion_props)
  return df
    
  
def process_batch(batchdf, batchid):
#   if batchdf.count()< TIME_STEPS:
#     return
  result_df = batchdf.groupby("timestamp").applyInPandas(detect_anomaly_autoencoder, schema)
  
  #Write detected anomaly to eventhub for notification
  result_df.filter("anomaly = 1").selectExpr("to_json(struct(anomaly,location,car_type, count,timestamp)) value") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ehns001.servicebus.windows.net:9093") \
  .option("topic", "anomaly_queue") \
  .option("kafka.sasl.mechanism","PLAIN") \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.jaas.config", EH_SASL ) \
  .save()
  #Write general result to Delta for visualization
  result_df.write.format("delta").mode("overwrite").saveAsTable("Anomany_Result")



  
query_df = spark.sql("select location, window(starttime, '1 minute').start timestamp, car_type, count(*) count from availability group by window(starttime, '1 minute'), car_type, location")
writer = query_df.writeStream.outputMode("complete").foreachBatch(process_batch)
writer.start()

In [None]:
%sql select * from Anomany_Result where anomaly <> -1

### Scoring Isolation Forest Model

In [None]:
import mlflow
import mlflow.sklearn
import pandas as pd
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.ingest import (
    QueuedIngestClient,
    IngestionProperties,
    KustoStreamingIngestClient,
    ManagedStreamingIngestClient

)

database_name ="db01"
table_name = "anomaly_output"
client_id = dbutils.secrets.get("scope01", "app01-id")
client_secret = dbutils.secrets.get("scope01", "app01-sec")
authority_id = "0fbe7234-45ea-498b-b7e4-1a8b2d3be4d9"

schema = "anomaly integer, location string, car_type string, count integer,timestamp timestamp"

def detect_anomaly_isolation(iterator):
  model = mlflow.sklearn.load_model("/dbfs/models/isolation_forest/isolation_forest")
  
  cluster = "https://adxc01.westus.kusto.windows.net"
  cluster_ingest_uri = "https://ingest-adxc01.westus.kusto.windows.net"

  kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(cluster_ingest_uri, client_id, client_secret, authority_id)
  queue_client = QueuedIngestClient(kcsb_ingest)
  ingestion_props = IngestionProperties(
  database=f"{database_name}",
  table=f"{table_name}",
  )
  
  for df in iterator:
    df.starttime = pd.to_datetime(df.starttime)
    df['hour'] = df.starttime.dt.hour
    df_transformed = df[['location','car_type', 'hour','count']]
    result = model.predict(df_transformed)
    df_transformed['anomaly'] = result
    df_transformed['timestamp'] = df['starttime']
    df_transformed = df_transformed[['anomaly','location','car_type', 'count','timestamp']]
    queue_client.ingest_from_dataframe(df_transformed, ingestion_properties=ingestion_props)

    yield df_transformed
    
  
def process_batch(batchdf, batchid):
  result_df = batchdf.mapInPandas(detect_anomaly_isolation, schema)
  #Write detected anomaly to eventhub for notification
  result_df.filter("anomaly = -1").selectExpr("to_json(struct(anomaly,location,car_type, count,timestamp)) value") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ehns001.servicebus.windows.net:9093") \
  .option("topic", "anomaly_queue") \
  .option("kafka.sasl.mechanism","PLAIN") \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.jaas.config", EH_SASL ) \
  .save()
  #Write general result to ADX



  
query_df = spark.sql("select location, window(starttime, '1 minute').start starttime, car_type, count(*) count from availability group by window(starttime, '1 minute'), car_type, location")
writer = query_df.writeStream.outputMode("complete").foreachBatch(process_batch)
writer.start()