# Evaluation of the anomaly detection

This notebook evaluates trained autoencoders

---

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from tqdm.notebook import tqdm

from data_loading import TimeSeriesWithAnoms
from params import TcnAeParams, tcn_ae_params, baseline_params
from evaluation import EvaluationMetric, PrecisionAtK, F1Score, evaluate_model
from train_models import prepare_data
from detectors import SubsequenceAnomalyDetector, TcnAeDetector, RandomDetector, LofDetector

In [None]:
torch.manual_seed(tcn_ae_params["random_seed"])

### Load and prepare data

We will load all time series and normalize them (we use the same function as in the training script)

In [None]:
all_series = prepare_data()
all_series[0]

In [None]:
# number of time series
len(all_series)

### Run anomaly detection with baseline models

First, we will examine anomaly detection quality with two baseline models:

- _RandomDetector_ - returns random subsequences
- _LofDetector_ - based on the well-known [Local Outlier Factor](https://scikit-learn.org/stable/auto_examples/neighbors/plot_lof_outlier_detection.html)
anomaly detection algorithm for multidimensional data

We will detect anomalies with both baseline models and calculate metrics for each time series.

In [None]:
def evaluate_baseline(
    baseline_detector: SubsequenceAnomalyDetector,
    all_series: list[TimeSeriesWithAnoms],
    metrics: list[EvaluationMetric],
) -> dict[str, dict[str, float]]:
    """Evaluate a baseline subsequence anomaly detection
    model for each time series

    Returns dict:
    {series_name -> {metric_name -> metric_value}}
    """
    return {
        series.name: evaluate_model(
            series, baseline_detector, metrics
        )
        for series in tqdm(all_series)
    }

In [None]:
metrics = [PrecisionAtK(), F1Score()]

In [None]:
random_detector = RandomDetector(random_seed=baseline_params["random_seed"])
random_metrics = evaluate_baseline(
    random_detector, all_series, metrics
)

In [None]:
lof_detector = LofDetector(
    n_neighbors=baseline_params.lof_n_neighbors,
    other_lof_params=baseline_params.lof_other_params,
)
lof_metrics = evaluate_baseline(
    lof_detector, all_series, metrics
)

### Run anomaly detection with TCN autoencoders

We will perform the following operations, for each time series:

- load trained model on the series
- get pointwise reconstruction errors on the series
- extract sliding windows from the errors and detect anomalous windows
- calculate metrics

In [None]:
def evaluate_tcnae(
    all_series: list[TimeSeriesWithAnoms],
    metrics: list[EvaluationMetric],
    params: TcnAeParams,
) -> dict[str, dict[str, float]]:
    """Loads the TCN autoencoder model and evaluates detection for each series

    Returns dict:
    {series_name -> {metric_name -> metric_value}}
    """
    series_metrics_dict: dict[str, dict[str, float]] = {}
    for series in tqdm(all_series):
        detector = TcnAeDetector.load(series, params)
        series_metrics_dict[series.name] = evaluate_model(
            series, detector, metrics
        )
    return series_metrics_dict

In [None]:
tcnae_metrics = evaluate_tcnae(
    all_series=all_series,
    metrics=metrics,
    params=tcn_ae_params,
)

In [None]:
tcnae_metrics

### Present results

We'll present evaluation metrics for each time series. Then, we will show summaries.

In [None]:
precision_at_k_results = pd.DataFrame({
    detector_name: pd.Series({k: v['precision@k'] for k,v in detector_metrics.items()})
    for detector_name, detector_metrics in [
        ("Random", random_metrics),
        ("LOF", lof_metrics),
        ("TCN AE", tcnae_metrics)
    ]
})
precision_at_k_results

In [None]:
f1_score_results = pd.DataFrame({
    detector_name: pd.Series({k: v['f1-score'] for k,v in detector_metrics.items()})
    for detector_name, detector_metrics in [
        ("Random", random_metrics),
        ("LOF", lof_metrics),
        ("TCN AE", tcnae_metrics)
    ]
})
f1_score_results

In [None]:
precision_at_k_results.mean()

In [None]:
f1_score_results.mean()

In [None]:
bins = 30

fig, axes = plt.subplots(1, 3, figsize=(17, 5))
fig.suptitle("Detector's Precision@k for series")

for detector_label, color, ax in zip(
    ["Random", "LOF", "TCN AE"],
    ["orange", "green", "blue"],
    axes
):
    ax.set_title(detector_label)
    ax.hist(
        precision_at_k_results[detector_label],
        bins=bins,
        color=color,
        range=(0,1)
    )
    ax.set_xlabel("precision@k")
    ax.set_ylabel("number of series")
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))

plt.savefig("img/precision_at_k.png")

In [None]:
bins = 30

fig, axes = plt.subplots(1, 3, figsize=(17, 5))
fig.suptitle("Detector's f1-score for series")

for detector_label, color, ax in zip(
    ["Random", "LOF", "TCN AE"],
    ["orange", "green", "blue"],
    axes
):
    ax.set_title(detector_label)
    ax.hist(
        f1_score_results[detector_label],
        bins=bins,
        color=color,
        range=(0,1)
    )
    ax.set_xlabel("f1-score")
    ax.set_ylabel("number of series")
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))

plt.savefig("img/f1_score.png")