<a href="https://colab.research.google.com/github/Artem535/course-time-series/blob/Task-1/arima_anomaly_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Cloning of the repository

[The Numenta Anomaly Benchmark](https://github.com/numenta/NAB) is used as the datasouce


In [None]:
%%bash
if [ ! -d "NAB" ]; then
    git clone https://github.com/numenta/NAB
fi

# Data Load

In [None]:
from pathlib import Path

import plotly.graph_objects as go
import numpy as np
import pandas as pd
import json

Path to the repository with data

In [None]:
nab = Path.cwd()/'NAB'
nab

PosixPath('/content/NAB')

 We use real CPU utilization from AWS Cloudwatch metrics for Amazon Relational Database Service


In [None]:
data_path = './NAB'
nab_data_path = data_path + '/data/'
labels_filepath = '/labels/combined_labels.json'
valid_filename = 'realAWSCloudwatch/rds_cpu_utilization_e47b3b.csv'
training_filename = 'realAWSCloudwatch/rds_cpu_utilization_cc0c53.csv'
data_path, labels_filepath, training_filename, valid_filename

('./NAB',
 '/labels/combined_labels.json',
 'realAWSCloudwatch/rds_cpu_utilization_cc0c53.csv',
 'realAWSCloudwatch/rds_cpu_utilization_e47b3b.csv')

In [None]:
labels_file = open(data_path + labels_filepath, 'r')
labels = json.loads(labels_file.read())
labels_file.close()

Get anomalies from the data


In [None]:
def load_data_frame_with_labels(file_name):
    data_frame = pd.read_csv(nab_data_path + file_name)
    data_frame['anomaly_label'] = data_frame['timestamp'].isin(labels[file_name]).astype(int)
    return data_frame
training_data_frame = load_data_frame_with_labels(training_filename)
valid_data_frame = load_data_frame_with_labels(valid_filename)

In [None]:
training_data_frame.head()

Unnamed: 0,timestamp,value,anomaly_label
0,2014-02-14 14:30:00,6.456,0
1,2014-02-14 14:35:00,5.816,0
2,2014-02-14 14:40:00,6.268,0
3,2014-02-14 14:45:00,5.816,0
4,2014-02-14 14:50:00,5.862,0


In [None]:
valid_data_frame.head()

Unnamed: 0,timestamp,value,anomaly_label
0,2014-04-10 00:02:00,14.012,0
1,2014-04-10 00:07:00,13.334,0
2,2014-04-10 00:12:00,15.0,0
3,2014-04-10 00:17:00,13.998,0
4,2014-04-10 00:22:00,14.332,0


# Data Preprocessing

## Convert timestamp and plot CPU usage with anomalies

In [None]:
training_data_frame['timestamp'] = pd.to_datetime(training_data_frame['timestamp'])
valid_data_frame['timestamp'] = pd.to_datetime(valid_data_frame['timestamp'])

In [None]:
def prepare_plot(data_frame):
    layout = dict(xaxis=dict(title='Timestamp'), yaxis=dict(title='CPU Utilization'))
    fig = go.Figure(layout=layout)
    fig.add_trace(go.Scatter(x=data_frame['timestamp'], y=data_frame['value'],
                            mode='markers', name='Non-anomaly',
                            marker=dict(color='blue')))
    target_anomalies = data_frame.loc[data_frame['anomaly_label'] == 1,['timestamp', 'value']]
    fig.add_trace(go.Scatter(x=target_anomalies['timestamp'], y=target_anomalies['value'],
                            mode='markers', name='Anomaly',
                            marker=dict(color='green', size=13)))
    return fig

Data for training


In [None]:
prepare_plot(training_data_frame)

Data for valid

In [None]:
prepare_plot(valid_data_frame)

## Statistical data research





In [None]:
import statsmodels.api as sm
import statsmodels.tsa.api as smt
from scipy import stats


pandas.util.testing is deprecated. Use the functions in the public API at pandas.testing instead.



**Stationarity check**

**Stationary Time Series**: data does not have any upward or downward trend or seasonal effects. Mean or variance are consistent over time

**Non-Stationary Time Series**: data show trends, seasonal effects, and other structures depend on time. Forecasting performance is dependent on the time of observation. Mean and variance change over time and a drift in the model is captured.

We should make time series stationary. However, there are cases where unknown nonlinear relationships cannot be determined by classical methods. This information can be a source of information when building machine learning models. Non-stationary information can be used in feature engineering and feature selection.

We will use **Dickey-Fuller test** tests the null hypothesis that a unit root is present in an autoregressive model. The alternative hypothesis is different depending on which version of the test is used, but is usually stationarity or trend-stationarity.
If p-value > 0.05, then time series are not stationary

In [None]:
print("Dickey–Fuller test for training df: p=%f" % sm.tsa.stattools.adfuller(training_data_frame.value)[1])
print("Dickey–Fuller test for valid df: p=%f" % sm.tsa.stattools.adfuller(valid_data_frame.value)[1])

Dickey–Fuller test for training df: p=0.863581
Dickey–Fuller test for valid df: p=0.462970


The series are not stationary.

Let's try to apply the **Box-Cox transformations**.

A Box Cox transformation is a way to transform non-normal dependent variables into a normal shape

In [None]:
training_data_frame['Weighted_Price_box'], lmbda = stats.boxcox(training_data_frame.value)
valid_data_frame['Weighted_Price_box'], lmbda = stats.boxcox(valid_data_frame.value)

In [None]:
print("Dickey–Fuller test: p=%f" % sm.tsa.stattools.adfuller(training_data_frame.Weighted_Price_box)[1])
print("Dickey–Fuller test: p=%f" % sm.tsa.stattools.adfuller(valid_data_frame.Weighted_Price_box)[1])

Dickey–Fuller test: p=0.802397
Dickey–Fuller test: p=0.522017


Still not stationary, but we will take this into account when choosing a model.

# SARIMAX Architecture

**SARIMAX model idea:**

**AR**: Autoregressive. An autoregression is a regression that tries to explain the values using their previous values.

**MA**: Moving Average.

The **I** then comes from Integrated simply serves the purpose of allowing the ARMA model to have a tendency, either increasing or decreasing. This is equivalent to saying ARIMA allows it to be non-stationary.

Now comes the **S** from seasonal, which adds periodicity to ARIMA, which basically says, for example in the case of load forecasting, that the load looks very similar everyday at 6 PM.

Finally the **X**, from exogenous variables, which basically allows external variables to be considered in the model. .  

We will use SARIMAX model from statsmodels.api

# Training of the model

Definition of the training loop

In [None]:
from itertools import product
import warnings
warnings.filterwarnings("ignore")

def write_predict(train_df, test_df):
    # Initial approximation of parameters
    Qs = range(0, 2)
    qs = range(0, 3)
    Ps = range(0, 3)
    ps = range(0, 3)
    D=1
    d=1
    parameters = product(ps, qs, Ps, Qs)
    parameters_list = list(parameters)

    # Best Model Selection
    results = []
    best_aic = float("inf")
    warnings.filterwarnings('ignore')
    for param in parameters_list:
        try:
            model=sm.tsa.statespace.SARIMAX(train_df.value, order=(param[0], d, param[1]),
                                            seasonal_order=(param[2], D, param[3], 12), initialization='approximate_diffuse').fit()
        except ValueError:
            print('wrong parameters:', param)
            continue
        aic = model.aic
        if aic < best_aic:
            best_model = model
            best_aic = aic
            best_param = param
        results.append([param, model.aic])
    print(best_param)
    result_table = pd.DataFrame(results)
    result_table.columns = ['parameters', 'aic']
    print(result_table.sort_values(by = 'aic', ascending=True).head())
    print(best_model.summary())

    train_df['predict'] = best_model.predict()
    train_df['predict'].fillna(0, inplace=True)

    best_model_valid = sm.tsa.statespace.SARIMAX(test_df.value, order=(best_param[0], d, best_param[1]),
                                            seasonal_order=(best_param[2], D, best_param[3], 12), initialization='approximate_diffuse').fit()
    test_df['predict'] = best_model_valid.predict()
    test_df['predict'].fillna(0, inplace=True)


    layout = dict(xaxis=dict(title='Timestamp'), yaxis=dict(title='CPU Utilization'))
    fig = go.Figure(layout=layout)
    fig.add_trace(go.Scatter(x=train_df['timestamp'], y=train_df['value'],
                            mode='markers', name='Ground Truth',
                            marker=dict(color='blue')))
    fig.add_trace(go.Scatter(x=train_df['timestamp'], y=train_df['predict'],
                            mode='markers', name='Predicted Value',
                            marker=dict(color='orange')))
    fig.show()


    layout = dict(xaxis=dict(title='Timestamp'), yaxis=dict(title='CPU Utilization'))
    fig = go.Figure(layout=layout)
    fig.add_trace(go.Scatter(x=test_df['timestamp'], y=test_df['value'],
                            mode='markers', name='Ground Truth',
                            marker=dict(color='blue')))
    fig.add_trace(go.Scatter(x=test_df['timestamp'], y=test_df['predict'],
                            mode='markers', name='Predicted Value',
                            marker=dict(color='orange')))
    fig.show()

# Results evaluation

## Plot of the 'pure' result for <b><i>train and valid data<i><b>

Getting of the 'pure' result

Note: training usually takes ~10 minutes

In [None]:
write_predict(training_data_frame, valid_data_frame)

(2, 2, 0, 1)
      parameters          aic
49  (2, 2, 0, 1)  6823.642475
51  (2, 2, 1, 1)  6827.466946
25  (1, 1, 0, 1)  6966.858230
43  (2, 1, 0, 1)  6968.858211
27  (1, 1, 1, 1)  6968.858861
                                 Statespace Model Results                                 
Dep. Variable:                              value   No. Observations:                 4032
Model:             SARIMAX(2, 1, 2)x(0, 1, 1, 12)   Log Likelihood               -3405.821
Date:                            Mon, 08 Jun 2020   AIC                           6823.642
Time:                                    13:50:34   BIC                           6861.435
Sample:                                         0   HQIC                          6837.036
                                           - 4032                                         
Covariance Type:                              opg                                         
                 coef    std err          z      P>|z|      [0.025      0.975]


Plot of the result:


*   Blue points - real values
*   Orange points - predicted values




## Anomaly detection with static threshold

We use **three-sigma rule** applied to model's prediction errors to detect anomalies

### Threshold calculation

In [None]:
def calculate_reconstruction_errors(input_data):
    return (input_data['value'] - input_data['predict']).to_numpy()

In [None]:
train_pred_errors = calculate_reconstruction_errors(training_data_frame)
pred_error_threshold = np.mean(train_pred_errors) + 3 * np.std(train_pred_errors)

### Data filtering


Then, we filter results of the model according to the threshold and get the **indexes** of detected anomalies

In [None]:
def detect_anomalies(pred_error_threshold,df):
    test_reconstruction_errors = calculate_reconstruction_errors(df)
    predicted_anomalies = list(
        map(lambda v: 1 if v > pred_error_threshold else 0,
        test_reconstruction_errors)
    )
    df['anomaly_predicted'] = predicted_anomalies
    indices = [i for i, x in enumerate(predicted_anomalies) if x == 1]
    return indices

In [None]:
train_anomalies_idxs = detect_anomalies(pred_error_threshold, training_data_frame)

Plot of the result for training data:


*   Blue points - non-anomaly data
*   Red points - detected anomaly data
*   Green points - real anomaly data

In [None]:
layout = dict(xaxis=dict(title='Timestamp'), yaxis=dict(title='CPU Utilization'))
fig = go.Figure(layout=layout)
fig.add_trace(go.Scatter(x=training_data_frame['timestamp'], y=training_data_frame['value'],
                         mode='markers', name='Non-anomaly',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=training_data_frame.loc[training_data_frame['anomaly_label'] == 1].timestamp, y=training_data_frame.loc[training_data_frame['anomaly_label'] == 1].value,
                         mode='markers', name='Real Anomaly',
                         marker=dict(color='green', size=13)))
fig.add_trace(go.Scatter(x=training_data_frame['timestamp'][train_anomalies_idxs],
                         y=training_data_frame['value'][train_anomalies_idxs],
                         mode='markers', name='Detected Anomaly',
                         marker=dict(color='red', size=7)))

Plot of the result for validation data:


*   Blue points - non-anomaly data
*   Red points - detected anomaly data
*   Green points - real anomaly data

In [None]:
valid_anomalies_idxs = detect_anomalies(pred_error_threshold, valid_data_frame)

In [None]:
layout = dict(xaxis=dict(title='Timestamp'), yaxis=dict(title='CPU Utilization'))
fig = go.Figure(layout=layout)
fig.add_trace(go.Scatter(x=valid_data_frame['timestamp'], y=valid_data_frame['value'],
                         mode='markers', name='Non-anomaly',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=valid_data_frame.loc[valid_data_frame['anomaly_label'] == 1].timestamp, y=valid_data_frame.loc[valid_data_frame['anomaly_label'] == 1].value,
                         mode='markers', name='Real Anomaly',
                         marker=dict(color='green', size=13)))
fig.add_trace(go.Scatter(x=valid_data_frame['timestamp'][valid_anomalies_idxs],
                         y=valid_data_frame['value'][valid_anomalies_idxs],
                         mode='markers', name='Detected Anomaly',
                         marker=dict(color='red', size=7)))

### Metrics calculation

Finally, we calculate several metrics for the model with one threshold:


*   Confusion matrix
*   Precision
*   Recall
*   F-beta score

In [None]:
from sklearn.metrics import precision_recall_fscore_support

In [None]:
def calculate_metrics(ground_truth: pd.DataFrame, anomalies_idxs: list):
    predictions = pd.DataFrame(index=range(len(ground_truth)), columns=['anomaly_predicted'])
    predictions['anomaly_predicted'] = 0
    predictions.iloc[anomalies_idxs] = 1

    confusion_matrix = pd.crosstab(ground_truth.loc[:, 'anomaly_label'], predictions['anomaly_predicted'], margins=True)
    precision, recall, f1, _ = precision_recall_fscore_support(
        ground_truth.loc[:, 'anomaly_label'], predictions['anomaly_predicted'], beta=2., average='binary'
    )
    return confusion_matrix, precision, recall, f1

Metrics for training data

In [None]:
train_conf_matrix, *train_metrics = calculate_metrics(training_data_frame, train_anomalies_idxs)
train_conf_matrix

anomaly_predicted,0,1,All
anomaly_label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,4017,13,4030
1,0,2,2
All,4017,15,4032


In [None]:
print(f'Train:\n Precision: {train_metrics[0]:.3f}\n Recall: {train_metrics[1]:.3f}\n F1 score: {train_metrics[2]:.3f}')

Train:
 Precision: 0.133
 Recall: 1.000
 F1 score: 0.435


Metrics for validation data

In [None]:
valid_conf_matrix, *valid_metrics = calculate_metrics(valid_data_frame, valid_anomalies_idxs)
valid_conf_matrix

anomaly_predicted,0,1,All
anomaly_label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,3978,52,4030
1,0,2,2
All,3978,54,4032


In [None]:
print(f'Valid:\n Precision: {valid_metrics[0]:.3f}\n Recall: {valid_metrics[1]:.3f}\n F1 score: {valid_metrics[2]:.3f}')

Valid:
 Precision: 0.037
 Recall: 1.000
 F1 score: 0.161


## Anomaly detection with dynamic threshold

Dynamic threshold is calculated for each point depending on mean and standart deviation in window around this point.

### Dynamic threshold calculation

In [None]:
window = 40
std_coef = 5

### Data filtering

Then, we filter results of the model according to the thresholds and get the **indexes** of detected anomalies

In [None]:
def detect_anomalies(df):
    df['error'] = df['value'] - df['predict']
    df['meanval'] = df['error'].rolling(window=window, min_periods=1).mean()
    df['deviation'] = df['error'].rolling(window=window, min_periods=1).std()
    df['upper_bound'] = df['meanval'] + (std_coef * df['deviation'])
    indices = df.index[df['error'] >= df['upper_bound']].values.tolist()
    indices = [i for i in indices ]
    return indices

In [None]:
train_anomalies_idxs = detect_anomalies(training_data_frame)

In [None]:
valid_anomalies_idxs = detect_anomalies(valid_data_frame)

Plot of the result for training data:


*   Blue points - non-anomaly data
*   Red points - detected anomaly data
*   Green points - real anomaly data

In [None]:
layout = dict(xaxis=dict(title='Timestamp'), yaxis=dict(title='CPU Utilization'))
fig = go.Figure(layout=layout)
fig.add_trace(go.Scatter(x=training_data_frame['timestamp'], y=training_data_frame['value'],
                         mode='markers', name='Non-anomaly',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=training_data_frame.loc[training_data_frame['anomaly_label'] == 1].timestamp, y=training_data_frame.loc[training_data_frame['anomaly_label'] == 1].value,
                         mode='markers', name='Real Anomaly',
                         marker=dict(color='green', size=13)))
fig.add_trace(go.Scatter(x=training_data_frame['timestamp'][train_anomalies_idxs],
                         y=training_data_frame['value'][train_anomalies_idxs],
                         mode='markers', name='Detected Anomaly',
                         marker=dict(color='red', size=7)))

Plot of the result for validation data:


*   Blue points - non-anomaly data
*   Red points - detected anomaly data
*   Green points - real anomaly data

In [None]:
layout = dict(xaxis=dict(title='Timestamp'), yaxis=dict(title='CPU Utilization'))
fig = go.Figure(layout=layout)
fig.add_trace(go.Scatter(x=valid_data_frame['timestamp'], y=valid_data_frame['value'],
                         mode='markers', name='Non-anomaly',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=valid_data_frame.loc[valid_data_frame['anomaly_label'] == 1].timestamp, y=valid_data_frame.loc[valid_data_frame['anomaly_label'] == 1].value,
                         mode='markers', name='Real Anomaly',
                         marker=dict(color='green', size=13)))
fig.add_trace(go.Scatter(x=valid_data_frame['timestamp'][valid_anomalies_idxs],
                         y=valid_data_frame['value'][valid_anomalies_idxs],
                         mode='markers', name='Detected Anomaly',
                         marker=dict(color='red', size=7)))

### Metrics calculation


Finally, we calculate several metrics for the model with dynamic threshold:


*   Confusion matrix
*   Precision
*   Recall
*   F-beta score

Metrics for training data

In [None]:
train_conf_matrix, *train_metrics = calculate_metrics(training_data_frame, train_anomalies_idxs)
train_conf_matrix

anomaly_predicted,0,1,All
anomaly_label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,4030,0,4030
1,1,1,2
All,4031,1,4032


In [None]:
print(f'Train:\n Precision: {train_metrics[0]:.3f}\n Recall: {train_metrics[1]:.3f}\n F1 score: {train_metrics[2]:.3f}')

Train:
 Precision: 1.000
 Recall: 0.500
 F1 score: 0.556


Metrics for validation data

In [None]:
valid_conf_matrix, *valid_metrics = calculate_metrics(valid_data_frame, valid_anomalies_idxs)
valid_conf_matrix

anomaly_predicted,0,1,All
anomaly_label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,4030,0,4030
1,0,2,2
All,4030,2,4032


In [None]:
print(f'Valid:\n Precision: {valid_metrics[0]:.3f}\n Recall: {valid_metrics[1]:.3f}\n F1 score: {valid_metrics[2]:.3f}')

Valid:
 Precision: 1.000
 Recall: 1.000
 F1 score: 1.000


In [None]:
valid_conf_matrix

anomaly_predicted,0,1,All
anomaly_label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,4030,0,4030
1,0,2,2
All,4030,2,4032
