# Multivariate anomaly detection in Real-Time Analytics in Microsoft Fabric

For the full tutorial, see https://aka.ms/mvad-rti

In [None]:
import numpy as np
import pandas as pd

## Load the table for training the model

Spark needs an ABFSS URI to securely connect to OneLake storage, so the next step defines this function to convert the OneLake URI to ABFSS URI.

In [None]:
def convert_onelake_to_abfss(onelake_uri):
    if not onelake_uri.startswith('https://'):
        raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
    uri_without_scheme = onelake_uri[8:]
    parts = uri_without_scheme.split('/')
    if len(parts) < 3:
        raise ValueError("Invalid OneLake URI format.")
    account_name = parts[0].split('.')[0]
    container_name = parts[1]
    path = '/'.join(parts[2:])
    abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
    return abfss_uri

Use the Onelake URI copied from part 6 of the tutorial to convert it to ABFSS URI.


In [None]:
onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
abfss_uri = convert_onelake_to_abfss(onelake_uri)
print(abfss_uri)

Load the table and convert it to a pandas dataframe

In [None]:
df = spark.read.format('delta').load(abfss_uri)
df = df.toPandas().set_index('Date')
print(df.shape)
df[:3]

## Prepare the training dataframe

The actual predictions will be run on data by the Eventhouse in [part 9 Predict-anomalies-in-the-kql-queryset](https://learn.microsoft.com/fabric/real-time-intelligence/multivariate-anomaly-detection#part-9--predict-anomalies-in-the-kql-queryset). In a production scenario, if you were streaming data into the eventhouse, the predictions would be made on the new streaming data. For the purpose of the tutorial, the dataset has been split by date into two sections for training and prediction. This is to simulate historical data and new streaming data.

In [None]:
features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
cutoff_date = '2023-01-01'

In [None]:
train_df = df[df.index < cutoff_date]
print(train_df.shape)
train_df[:3]

In [None]:
train_len = len(train_df)
predict_len = len(df) - train_len
print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')

## Train the model and register it

In [None]:
import mlflow
from anomaly_detector import MultivariateAnomalyDetector
model = MultivariateAnomalyDetector()

In [None]:
sliding_window = 200
params = {"sliding_window": sliding_window}

In [None]:
model.fit(train_df, params=params)

In [None]:
with mlflow.start_run():
    mlflow.log_params(params)
    mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")

    model_info = mlflow.pyfunc.log_model(
        python_model=model,
        artifact_path="mvad_artifacts",
        registered_model_name="mvad_5_stocks_model",
    )

## Extract the registered model path to be used for prediction using Kusto Python sandbox

In [None]:
mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
model_abfss = mi.latest_versions[0].source
print(model_abfss)