# Multivariate Anomaly Detection Demo Notebook

IoT Hub -> SQL -> Notebook -> Anomaly Detector API -> Chart

## Contents

1. [Introduction](#intro)
2. [Prerequisites](#pre)
5. [Inference](#inference)
6. [Analysis (for reference only)](#analysis)

## 2. Prerequisites <a class="anchor" id="pre"></a>


* [Create an Azure subscription](https://azure.microsoft.com/free/cognitive-services) if you don't have one.
* [Create an Anomaly Detector resource](https://ms.portal.azure.com/#create/Microsoft.CognitiveServicesAnomalyDetector) and get your `endpoint` and `key`, you'll use these later.
* (**optional**) [Install Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli) A helpful tool to manipulate your Azure resources. You can use Azure CLI to retrieve credential information without pasting them as plain text.
* (**optional**) Login with Azure CLI `az login`

### Sample code to generate SAS (for reference only)


## 6. Visualization of detection results (for reference only) <a class="anchor" id="analysis"></a>

In [None]:
# Select device for anomaly analysis

deviceId = "machine1"
inferenceDays = 7

In [None]:
# Install required packages. uncomment to install

# ! pip3 install notebook
# ! pip3 install azure-ai-anomalydetector
# ! pip3 install azure-core
# ! pip3 install azure-storage-blob
# ! pip3 install python-dotenv
# ! pip3 install pandas
# ! pip3 install plotly
# ! pip3 install bokeh

In [None]:
# Load libraries

from azure.ai.anomalydetector import AnomalyDetectorClient
from azure.ai.anomalydetector.models import DetectionRequest, DetectionStatus
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.storage.blob import BlobClient, BlobServiceClient, generate_blob_sas, BlobSasPermissions
from datetime import datetime, timedelta
from dotenv import load_dotenv
from pathlib import Path
import pandas as pd
import time
import zipfile
import os
import tempfile


import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [None]:
# Load environment variables

env_path = Path('.') / '.env'
load_dotenv(dotenv_path=env_path)

inference_telemetry_endpoint_url = os.environ.get('inference_telemetry_endpoint_url')
inference_telemetry_endpoint_key = os.environ.get('inference_telemetry_endpoint_key')
storage_connection_string = os.environ.get('storage_connection_string')
anomaly_detector_endpoint = os.environ.get('anomaly_detector_endpoint')
anomaly_detector_key = os.environ.get('anomaly_detector_key')
model_id = os.environ.get('anomaly_detector_model_id')

temp_dir = tempfile.gettempdir()
zip_filename = temp_dir + "/telemetry_mvad.zip"


In [None]:
# Load Azure Anomaly Detector helper functions

class MultivariateSample:

    def __init__(self, anomaly_detector_endpoint=None, anomaly_detector_key=None, model_id=None, connection_string=None, container=None, blob_name=None):
        self.blob_name = blob_name
        self.container = container
        self.connection_string = connection_string
        self.model_id = model_id
        self.anomaly_detector_endpoint = anomaly_detector_endpoint
        self.anomaly_detector_key = anomaly_detector_key

        # Create an Anomaly Detector client

        # <client>
        self.ad_client = AnomalyDetectorClient(AzureKeyCredential(self.anomaly_detector_key), self.anomaly_detector_endpoint)
        # </client>        

    def upload_blob(self, filename):
        blob_client = BlobClient.from_connection_string(self.connection_string, container_name=self.container, blob_name=self.blob_name)
        with open(filename, "rb") as f:
            blob_client.upload_blob(f, overwrite=True)

    def detect(self, start_time, end_time):
        # Detect anomaly in the same data source (but a different interval)
        try:
            data_source = self.generate_data_source_sas(self.container, self.blob_name)
            detection_req = DetectionRequest(source=data_source, start_time=start_time, end_time=end_time)
            response_header = self.ad_client.detect_anomaly(self.model_id, detection_req,
                                                            cls=lambda *args: [args[i] for i in range(len(args))])[-1]
            result_id = response_header['Location'].split("/")[-1]

            # Get results (may need a few seconds)
            r = self.ad_client.get_detection_result(result_id)
            print("Get detection result...(it may take a few seconds)")

            while r.summary.status != DetectionStatus.READY and r.summary.status != DetectionStatus.FAILED:
                r = self.ad_client.get_detection_result(result_id)
                print("waiting for anomaly detection result...")
                time.sleep(1)

            if r.summary.status == DetectionStatus.FAILED:
                print("Detection failed.")
                if r.summary.errors:
                    for error in r.summary.errors:
                        print("Error code: {}. Message: {}".format(error.code, error.message))
                else:
                    print("None")
                return None

        except HttpResponseError as e:
            print('Error code: {}'.format(e.error.code), 'Error message: {}'.format(e.error.message))
            return None
        except Exception as e:
            raise e

        return r

    def generate_data_source_sas(self, container, blob_name):
        BLOB_SAS_TEMPLATE = "{blob_endpoint}{container_name}/{blob_name}?{sas_token}"

        blob_service_client = BlobServiceClient.from_connection_string(conn_str=self.connection_string)
        sas_token = generate_blob_sas(account_name=blob_service_client.account_name,
                                    container_name=container, blob_name=blob_name,
                                    account_key=blob_service_client.credential.account_key,
                                    permission=BlobSasPermissions(read=True),
                                    expiry=datetime.utcnow() + timedelta(days=1))
        blob_sas = BLOB_SAS_TEMPLATE.format(blob_endpoint=blob_service_client.primary_endpoint,
                                            container_name=container, blob_name=blob_name, sas_token=sas_token)
        return blob_sas
    

sample = MultivariateSample(anomaly_detector_endpoint, anomaly_detector_key, model_id, storage_connection_string, 'data', "telemetry_mvad.zip")

In [None]:
# Get last X days of telemetry from Azure SQL via REST API

api_url = f"{inference_telemetry_endpoint_url}/{deviceId}/{inferenceDays}?code={inference_telemetry_endpoint_key}"

df = pd.read_json(api_url, convert_dates=False)
df.set_index('timestamp', inplace=True) 
df


In [None]:
# Upload batch of data tp Azure Storage Account for inference

if not df.empty:    

    zip_file = zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED)

    for variable in df.columns:
        individual_df = pd.DataFrame(df[variable].values, index=df.index, columns=["value"])
        individual_df.to_csv(temp_dir + "/" + variable + ".csv", index=True)
        zip_file.write(temp_dir + "/" + variable + ".csv", arcname=variable + ".csv")

    zip_file.close()

    sample.upload_blob(zip_filename)

In [None]:
# Call Azure Anomaly Detector API

if not df.empty:
    
    start_time = df.index[0]
    end_time = df.index[-1]

    r = sample.detect(start_time=start_time, end_time=end_time)

    if r is not None:
        results = r.results
        print("Anomaly detection completed")
    else:
        print("Anomaly detection failed")
        results = None

In [None]:
# Build chart data

if results is not None:

    is_anomalies = []
    sev = []
    scores = []
    sensitivity = 0.15

    for item in results:
        if item.value:
            is_anomalies.append(item.value.is_anomaly)
            sev.append(item.value.severity)
            scores.append(item.value.score)

    anomalous_timestamps = []
    num_contributors = 3
    top_values = {f"top_{i}": [] for i in range(num_contributors)}
    
    for ts, item in zip(df.index, r.results):
        if item.value.is_anomaly and item.value.severity > 1 - sensitivity:
            anomalous_timestamps.append(ts)
            for i in range(num_contributors):
                top_values[f"top_{i}"].append(df[item.value.interpretation[i].variable][ts])

In [None]:
# Chart results from Azure Anomaly Detector API

if results is not None:
    print("Display chart")

    fig = make_subplots(rows=3, cols=1, shared_xaxes=True)
    colors = [px.colors.sequential.Greys[-1], px.colors.sequential.Greys[-3], px.colors.sequential.Greys[-6]]
    
    for v in df.columns:
        fig.add_trace(go.Scatter(x=df.index, y=df[v], mode='lines', name=v), row=1, col=1)
        
    for i in range(num_contributors):
        fig.add_trace(go.Scatter(x=anomalous_timestamps, y=top_values[f"top_{i}"],
                                 mode="markers", name=f"Top {i+1} contributor",
                                 marker=dict(color=colors[i],size=8,)),row=1, col=1)
        
    fig.add_trace(go.Scatter(x=df.index, y=scores, mode='lines', name='score'), row=2, col=1)
    fig.add_trace(go.Scatter(x=df.index, y=sev, mode='lines', name='severity'), row=3, col=1)
    
    fig.update_layout(title_text="Visualization of detection results")
    fig.update_yaxes(title_text="value", row=1, col=1)
    fig.update_yaxes(title_text="score", row=2, col=1)
    fig.update_yaxes(title_text="severity", row=3, col=1)
    
    fig.show()