# Multivariate Anomaly Detector Demo

## Sample Description
### SKAB Anomaly Detection Benchmark
- source: [Benckmark - Anomaly in Timeseries SKAB
](https://www.kaggle.com/datasets/caesarlupum/benckmark-anomaly-timeseries-skab)
- datasets
    - datetime - Represents dates and times of the moment when the value is written to the database (YYYY-MM-DD hh:mm:ss)
    - Accelerometer1RMS - Shows a vibration acceleration (Amount of g units)
    - Accelerometer2RMS - Shows a vibration acceleration (Amount of g units)
    - Current - Shows the amperage on the electric motor (Ampere)
    - Pressure - Represents the pressure in the loop after the water pump (Bar)
    - Temperature - Shows the temperature of the engine body (The degree Celsius)
    - Thermocouple - Represents the temperature of the fluid in the circulation loop (The degree Celsius)
    - Voltage - Shows the voltage on the electric motor (Volt)
    - RateRMS - Represents the circulation flow rate of the fluid inside the loop (Liter per minute)
    - anomaly - Shows if the point is anomalous (0 or 1)
    - changepoint - Shows if the point is a changepoint for collective anomalies (0 or 1)

### Install the required packages

In [None]:
!pip install --upgrade pandas azure-ai-anomalydetector

### 1. Data Preparation
Load the sample data

In [None]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime as dt
from keys import *
# This is to build interactive plot:
pd.options.plotting.backend = "plotly"
df = pd.read_csv("./alldata_skab.csv", parse_dates=['datetime'], index_col="datetime")
df.head()

In [None]:
df.index.to_series().apply(lambda x: x.date()).value_counts()

data transformation and dataset split

In [None]:
# Extract the data we want and split it into train set and test set
df.index = df.index.to_series().apply(lambda x: x.isoformat() + "Z")
df = df[df.index.str.endswith("0Z") | df.index.str.endswith("5Z")]
train_df = df[df.index.str.contains("2020-02-08")].copy()
test_df = df[df.index.str.contains("2020-03-01")].copy()
train_df.head()

Drop the redundent columns

In [None]:
train_df.drop(['anomaly', 'changepoint'], axis=1, inplace=True)

Initialize the service client

In [None]:
from azure.ai.anomalydetector import AnomalyDetectorClient
from azure.ai.anomalydetector.models import DetectionRequest, ModelInfo, LastDetectionRequest, VariableValues
from azure.core.credentials import AzureKeyCredential
import os

anomaly_detector_endpoint = mvad_endpoint
subscription_key = mvad_key
# Create an Anomaly Detector client.
ad_client = AnomalyDetectorClient(AzureKeyCredential(subscription_key), anomaly_detector_endpoint)

Define the function for preparing the data source:
- pack_data - Pack each column into a single csv file and store as required
- zip_file - A helper function to compress local csv files.
- upload -  A helper function to upload files to blob
- generate_data_source_sas - A helper function to generate blob SAS.

In [None]:
import os
import zipfile
from azure.storage.blob import BlobClient, BlobServiceClient, generate_blob_sas, BlobSasPermissions
from datetime import datetime, timedelta
BLOB_SAS_TEMPLATE = "https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"

def pack_data(df, folder_name):
    """
    Pack each column into a single csv file and store as the following structure:
    - series
        - series_1.csv
        - series_2.csv
        ...
    :param df: the cleaned dataframe containing only the required variables with timestamp indices
    :param folder_name: name of the folder of the packed files
    """

    series = {}
    for col in df.columns:
        series[col] = df[[col]].copy()
        series[col]['timestamp'] = series[col].index
        series[col].columns = ['value', 'timestamp']

    os.mkdir(folder_name)
    for k, v in series.items():
        v.to_csv(f"./{folder_name}/{k}.csv", index=False)

def zip_file(root, name):
    """
    A helper function to compress local csv files.
    :param root: root directory of csv files
    :param name: name of the compressed file (with suffix) 
    """
    z = zipfile.ZipFile(name, "w", zipfile.ZIP_DEFLATED)
    for f in os.listdir(root):
        if f.endswith("csv"):
            z.write(os.path.join(root, f), f)
    z.close()
    print("Compress files success!")

def upload_to_blob(file, conn_str, container, blob_name):
    """
    A helper function to upload files to blob
    :param file: the path to the file to be uploaded
    :param conn_str: the connection string of the target storage account
    :param container: the container name in the storage account
    :param blob_name: the blob name in the container
    """
    blob_client = BlobClient.from_connection_string(conn_str, container_name=container, blob_name=blob_name)
    with open(file, "rb") as f:
        blob_client.upload_blob(f, overwrite=True)
    print("Upload Success!")

def generate_data_source_sas(conn_str, container, blob_name):
    """
    A helper function to generate blob SAS.
    :param conn_str: the connection string of the target storage account
    :param container: the container name in the storage account
    :param blob_name: the blob name in the container
    :return: generated SAS
    """
    blob_service_client = BlobServiceClient.from_connection_string(conn_str=conn_str)
    sas_token = generate_blob_sas(account_name=blob_service_client.account_name,
                                  container_name=container,
                                  blob_name=blob_name,
                                  account_key=blob_service_client.credential.account_key,
                                  permission=BlobSasPermissions(read=True),
                                  expiry=datetime.utcnow() + timedelta(days=1))
    blob_sas = BLOB_SAS_TEMPLATE.format(account_name=blob_service_client.account_name,
                                        container_name=container,
                                        blob_name=blob_name,
                                        sas_token=sas_token)
    return blob_sas

Pack the data and zip them together

In [None]:
folder_name = "series"
zipfile_name = "series.zip"

pack_data(train_df, folder_name)
zip_file(folder_name, zipfile_name)

Upload the zip file to Blob Storage and get the SAS URL

In [None]:
connection_string = storage_account
container_name = "skab"

upload_to_blob(zipfile_name, connection_string, container_name, zipfile_name)
data_source = generate_data_source_sas(connection_string, container_name, zipfile_name)

print("Blob SAS url: " + data_source)

### 2. Model Training
Generate data feed and start training the model

In [None]:
start_time, end_time = train_df.index.min(), train_df.index.max()
sliding_window = 100
data_feed = ModelInfo(start_time=start_time, end_time=end_time, source=data_source, sliding_window=sliding_window)
response_header = ad_client.train_multivariate_model(data_feed, cls=lambda *args: [args[i] for i in range(len(args))])[-1]
trained_model_id = response_header['Location'].split("/")[-1]

print(f"model id: {trained_model_id}")

With the returned model ID, we can examine its status

In [None]:
model_status = ad_client.get_multivariate_model(trained_model_id).model_info.status
print(f"model status: {model_status}")

Get model information and track training progress.

In [None]:
import numpy as np
import pandas as pd
from plotly.subplots import make_subplots

model = ad_client.get_multivariate_model(trained_model_id)
current_epoch = 0 if len(model.model_info.diagnostics_info.model_state.epoch_ids) == 0 else model.model_info.diagnostics_info.model_state.epoch_ids[-1]
print(f"training progress: {current_epoch}/100.")
if model.model_info.status == "READY":
    model_state = model.model_info.diagnostics_info.model_state
    epoch_ids = model_state.epoch_ids
    train_losses = model_state.train_losses
    validation_losses = model_state.validation_losses
    latency = model_state.latencies_in_seconds
    loss_summary = pd.DataFrame({
        "epoch_id": epoch_ids, 
        "train_loss": train_losses, 
        "validation_loss": validation_losses,
        "latency": latency
    })
    display(loss_summary)
    fig = make_subplots(specs=[[{"secondary_y": True}]])
    fig.add_trace(go.Scatter(x=epoch_ids, y=train_losses, 
                             mode='lines',
                             name='train losses'))
    fig.add_trace(go.Scatter(x=epoch_ids, y=validation_losses,
                             mode='lines',
                             name='validation losses'))
    fig.add_trace(go.Scatter(x=epoch_ids, y=latency,
                             mode='markers', name='latency'),
                  secondary_y=True)
    fig.update_layout(
        title_text="Visualization of training progress"
    )
    fig.update_xaxes(title_text="Epoch IDs")

    # Set y-axes titles
    fig.update_yaxes(title_text="Loss value", secondary_y=False)
    fig.update_yaxes(title_text="Latency (s)", secondary_y=True)

    fig.show()

### 3. Inference
two ways of detecting: asynchronous and synchronous
- Asynchronously  
  
  repeat the data preparation stage with different set of data

In [None]:
folder_name = "test_series"
zipfile_name = "test_series.zip"

pack_data(test_df.drop(['anomaly', 'changepoint'], axis=1), folder_name)
zip_file(folder_name, zipfile_name)

In [None]:
connection_string = storage_account
container_name = "skab"

upload_to_blob(zipfile_name, connection_string, container_name, zipfile_name)
data_source = generate_data_source_sas(connection_string, container_name, zipfile_name)

print("Blob SAS url: " + data_source)

create request object with `DetectionRequest`

In [None]:
start_inference_time = test_df.index.min()
end_inference_time = test_df.index.max()
detection_req = DetectionRequest(source=data_source, start_time=start_inference_time, end_time=end_inference_time)
response_header = ad_client.detect_anomaly(trained_model_id, detection_req, cls=lambda *args: [args[i] for i in range(len(args))])[-1]
result_id = response_header['Location'].split("/")[-1]
print(f"result id: {result_id}")

send the detection request with `ad_client.detect_anomaly`, and get the result id

In [None]:
r = ad_client.get_detection_result(result_id)
print(f"result status: {r.summary.status}")

### Visualization and Evaluation
process the results

In [None]:
import requests
import numpy as np

results = r.results

In [None]:
is_anomalies = []
sev = []
scores = []
sensitivity = 0.7
for item in results:
    if item.value:
        is_anomalies.append(item.value.is_anomaly)
        sev.append(item.value.severity)
        scores.append(item.value.score)

anomolous_timestamps = []
num_contributors = 3
top_values = {f"top_{i}": [] for i in range(num_contributors)}
for ts, item in zip(test_df.index, r.results):
    if item.value.is_anomaly and item.value.severity > 1 - sensitivity:
        anomolous_timestamps.append(ts)
        for i in range(num_contributors):
            top_values[f"top_{i}"].append(test_df[item.value.interpretation[i].variable][ts])

Let's evaluate its performance
- extract the label and the predictions

In [None]:
y_test = test_df.anomaly.fillna(0).apply(lambda x: bool(x)).to_list()
y_pred = [True if(is_anomalies[i] and (sev[i] > 1 - sensitivity)) else False for i in range(len(is_anomalies))]

- Calculate the precision, recall and f1 score

In [None]:
from sklearn.metrics import accuracy_score, recall_score, f1_score
from pprint import pprint
performance = {
    "precision": accuracy_score(y_test, y_pred), 
    "recall": recall_score(y_test, y_pred), 
    "f1 score": f1_score(y_test, y_pred)}
performance

Plot all the series and mark out the anomalies

In [None]:
import numpy as np
from plotly.subplots import make_subplots

fig = make_subplots(rows=3, cols=1, shared_xaxes=True)
colors = [px.colors.sequential.Greys[-1], px.colors.sequential.Greys[-3], px.colors.sequential.Greys[-6]]
for v in test_df.columns:
    if v == "datetime":
        continue
    fig.add_trace(go.Scatter(x=test_df.index, y=test_df[v], 
                             mode='lines',
                             name=v),
                  row=1, col=1)
for i in range(num_contributors):
    fig.add_trace(go.Scatter(x=anomolous_timestamps, y=top_values[f"top_{i}"],
                             mode="markers", name=f"Top {i+1} contributor",
                             marker=dict(
                                color=colors[i],
                                size=8,
                            )),
                  row=1, col=1)

y_test_timestamp = []
y_test_score = []
for idx, is_anomaly in enumerate(y_test):
    if is_anomaly:
        y_test_timestamp.append(test_df.index[idx])
        y_test_score.append(scores[idx])

fig.add_trace(go.Scatter(x=test_df.index, y=scores,
                         mode='lines',
                         name='score'),
              row=2, col=1)
fig.add_trace(go.Scatter(x=y_test_timestamp, y=y_test_score, mode="markers",  marker=dict(color="red", size=8), name="label"), row=2, col=1)

fig.add_trace(go.Scatter(x=test_df.index, y=sev,
                         mode='lines', name='severity'),
              row=3, col=1)

fig.update_layout(
    title_text="Visualization of detection results"
)
fig.update_yaxes(title_text="value", row=1, col=1)
fig.update_yaxes(title_text="score", row=2, col=1)
fig.update_yaxes(title_text="severity", row=3, col=1)
fig.show()

#### Detect Anomalies Synchronously
- prepare the input as follows:

  ```python
        [{
            "name": <series1>,
            "timestamps": [<timestamp>],
            "values": [<value>]
         },
          ...
        ]
  ```
 - create request object with `LastDetectionRequest`
 - send the detection request with `ad_client.last_detect_anomaly`, and get the result object

In [None]:
import json
sample_input_df = test_df.iloc[300:410].drop(['anomaly', 'changepoint'], axis=1)
sample_input = [{"name": var, 
                 "timestamps": sample_input_df.index.tolist(), 
                 "values": sample_input_df[var].tolist()} for var in sample_input_df.columns]
last_detection_request = LastDetectionRequest(variables=[VariableValues(**item) for item in sample_input], detecting_points=10)
res = ad_client.last_detect_anomaly(model_id=trained_model_id, body=last_detection_request)

Process the results and display the series separately

In [None]:
sensitivity = 0.7
anomolous_timestamps = []
num_contributors = 3
anomaly_contributor = {k: {n+1: {"timestamp": [], "value": []} for n in range(num_contributors)} for k in sample_input_df.columns}

for item in res.results:
    ts = item.timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
    if item.value.is_anomaly and item.value.severity > 1 - sensitivity:
        anomolous_timestamps.append(ts)
        for i in range(num_contributors):
            var = item.value.interpretation[i].variable
            anomaly_contributor[var][i+1]['timestamp'].append(ts)
            anomaly_contributor[var][i+1]['value'].append(sample_input_df[var][ts])

In [None]:
fig = make_subplots(rows=len(sample_input_df.columns), cols=1, shared_xaxes=True)
colors = [px.colors.sequential.Greys[-1], px.colors.sequential.Greys[-3], px.colors.sequential.Greys[-6]]
for idx, v in enumerate(sample_input_df.columns):
    fig.add_trace(go.Scatter(x=sample_input_df.index, y=sample_input_df[v], 
                             mode='lines',
                             name=v),
                  row=idx+1, col=1)
    for i in range(num_contributors):
        fig.add_trace(go.Scatter(x=anomaly_contributor[v][i+1]['timestamp'], y=anomaly_contributor[v][i+1]['value'],
                                mode="markers", name=f"Top {i+1} contributor",
                                marker=dict(
                                    color=colors[i],
                                    size=8,
                                )),
                    row=idx+1, col=1)
fig.update_layout(
    title_text="Visualization of detection results"
)
fig.update_yaxes(title_text="value", row=1, col=1)
fig.update_yaxes(title_text="score", row=2, col=1)
fig.update_yaxes(title_text="severity", row=3, col=1)
fig.show()