## Anomaly Detection ML Model

##### Reference: https://learn.microsoft.com/en-us/fabric/data-science/isolation-forest-multivariate-anomaly-detection

### Library Imports

In [246]:
from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *

%matplotlib inline

from pyspark.sql.functions import col

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 251, Finished, Available)

### Input Data

In [247]:
# Table inputs


timestampColumn = "Date"  # str: the name of the timestamp column in the table
inputCols = [
    "Quantity",
    "Unit_Price"
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2023-01-01T00:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2023-07-31T23:59:590Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2023-08-01T00:00:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2023-08-31T23:59:59Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.1
num_estimators = 100
max_samples = 256
max_features = 1.0

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 252, Finished, Available)

### Load Data
##### Aggregations done for Quantity and Price to view daily data

In [248]:
df = (
    spark.read.format("delta")
    .load("abfss://MS_Fabric_Hackathon@onelake.dfs.fabric.microsoft.com/Hackathon.Lakehouse/Tables/Fact_Sales")
)

# Ensure 'Quantity' and 'Unit_Price' are treated as numeric types
df = df.withColumn("Quantity", F.col("Quantity").cast(IntegerType())) \
       .withColumn("Unit_Price", F.col("Unit_Price").cast(DoubleType()))

# Perform aggregation
df = df.groupBy("Date") \
    .agg(
        F.sum("Quantity").alias("Quantity"),  # Sum of Quantity
        (F.sum(F.col("Quantity") * F.col("Unit_Price")) / F.sum("Quantity")).alias("Unit_Price")  # Average Price calculation
    )

df.show()

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 253, Finished, Available)

+----------+--------+------------------+
|      Date|Quantity|        Unit_Price|
+----------+--------+------------------+
|2023-06-22|    4895| 50.40332992849845|
|2023-07-15|    4424|51.101333634719715|
|2023-09-14|    4433|48.476652379878196|
|2023-11-08|    4373| 49.69174479762175|
|2023-05-22|    4404| 48.41784741144419|
|2023-11-22|    4533| 50.54835649680125|
|2023-02-25|    4326| 49.05663430420706|
|2023-06-18|    4720| 50.14762711864399|
|2023-09-19|    4494| 48.43689363595904|
|2023-06-23|    4583|49.288108226052806|
|2023-02-08|    4463| 49.12263051758912|
|2023-12-10|    4495|47.880600667408245|
|2023-11-29|    4173|50.246872753414785|
|2023-03-12|    4301|49.013252731922876|
|2023-09-27|    4481| 49.46744030350363|
|2023-03-24|    4699|50.792870823579435|
|2023-11-25|    4402| 50.88468877782826|
|2023-11-17|    4460| 51.47186098654712|
|2023-07-29|    4632|48.611075129533674|
|2023-01-01|    4893| 50.59280604945839|
+----------+--------+------------------+
only showing top

### Ensure columns are cast as needed

In [249]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

df = (
    df.orderBy(timestampColumn)
    .withColumn("Date", F.date_format(F.col(timestampColumn), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("Quantity", F.col("Quantity").cast(DoubleType()))
    .withColumn("Unit_Price", F.col("Unit_Price").cast(DoubleType()))
    .select(
        "Date", 
        "Quantity", 
        "Unit_Price"
    )
)

display(df)

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 254, Finished, Available)

SynapseWidget(Synapse.DataFrame, 23844752-e0f6-4237-b3e4-91dc9eab4e4f)

### Prepare training and test data

In [250]:
# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 255, Finished, Available)

SynapseWidget(Synapse.DataFrame, cf2a351a-e74b-4131-ab11-a3a3137df0fa)

In [251]:
# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 256, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5eedf564-85d8-4e4f-b17d-06b1d001f751)

### Train Isolation Forest model

In [252]:
isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.001 * contamination)
    .setRandomSeed(1)
)

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 257, Finished, Available)

### Perform inferencing

In [253]:
df_test_pred = model.transform(df_test)
display(df_test_pred)

StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 258, Finished, Available)

SynapseWidget(Synapse.DataFrame, a8823be3-b7a6-44fd-9489-1bcdb8714404)

### Show anomalies

In [257]:
from pyspark.sql.functions import col, date_format

# Assuming 'predicted_anomalies' is your DataFrame and it includes a column named 'Date' with datetime values

# Sort by outlierScore in descending order, format the 'Date' column to show only the date, and then show the top 5
predicted_anomalies_sorted = df_test_pred.withColumn("Date", date_format(col("Date"), "yyyy-MM-dd")) \
                                                .orderBy(col("outlierScore").desc())

# Show the top 5 predicted anomalies by outlierScore, displaying only the date
predicted_anomalies_sorted.select("Date", "outlierScore").show(5)


StatementMeta(, 72aef49f-edab-4193-80b1-8080952fd80b, 262, Finished, Available)

+----------+------------------+
|      Date|      outlierScore|
+----------+------------------+
|2023-08-12|0.5551930050596464|
|2023-08-23|0.5551930050596464|
|2023-08-21|0.5543218508382048|
|2023-08-10|0.5539446738472573|
|2023-08-02|0.5539446738472573|
+----------+------------------+
only showing top 5 rows

