# Anomaly detection using Kalman filter

In [101]:
import numpy as np
import pandas as pd

from filterpy.kalman import KalmanFilter
import pywt
import scipy.stats as stats

import plotly.graph_objects as go
import matplotlib.pyplot as plt

INPUT_FILE_PATH="data/cesnet-time-series24"

## Kalman filter

The Kalman filter is an algorithm that can predict future values based on observations. This property can be exploited by comparing the actual measured values with the predicted values. If the values are significantly different it may indicate an anomaly.

When we use the [Kalman filter](https://en.wikipedia.org/wiki/Kalman_filter) we get the predicted values, if we subtract these predicted values from the actual measured values we get the so-called **residuals**. **The residuals express the difference between the actual and measured values.** These residues can be treated by different methods. In this notebook, 4 different methods are implemented. The main inspiration was from paper [Combining filtering and statistical methods for anomaly detection](https://dl.acm.org/doi/pdf/10.5555/1251086.1251117), which discusses the use of the Kalman filter and different methods for anomaly detection. All methods require a **threshold** setting. Determining this threshold is quite challenging and may depend on the context, the long-term behavior of the traffic, or how sensitive we want to be to anomaly detection.


## Implemented methods

### 1. Thresholding

This method assumes that the residual process behaves as a normally distributed random variable with a mean close to zero and some variance. It works by setting some threshold, for example, according to the 3-sigma rule, and any residual that exceeds this threshold will be flagged as an anomaly. This method is very simple, but it assumes that the data have constant variability.

The number of anomalies can be set using the `threshold` parameter manually or by setting `std_multiple`, which specifies the multiple of the standard deviation that will be treated as the threshold.

### 2. Cumulative Sum (CUSUM)

The [CUSUM](https://en.wikipedia.org/wiki/CUSUM) method tracks cumulative changes in the residual process and is more sensitive to gradual variations than the baseline method with variance. If the deviation increases or decreases continuously and exceeds a given threshold, an anomaly is detected. Compared to the basic threshold method, it is more sensitive to gradual changes.

The number of anomalies can be set using the `threshold` parameter manually or by setting `std_multiple`, which specifies the multiple of the standard deviation that will be treated as the threshold. Additionally, there is a `drift` parameter which specifies the deviation from the mean for cumulative summation and is set to 0.5 times the standard deviation by default.

### 3. Generalized Likelihood Ratio Test (GLRT)

The problem with CUSUM is that it requires setting the **level of change** (it can be set based on the standard deviation, but again we are lying on its constancy). The GLRT method attempts to address this problem by using **maximum likelihood estimations** (MLE). This method requires setting a window in which to observe whether there is a change in the mean using the MLE. The idea is that if there are no anomalies in the data, the residuals should be around zero (mean), i.e. H0: mean = 0, if not, then there may be an anomaly, H1: mean = µ. This method is primarily characterized by detecting the **beginning** of the anomaly (therefore the points on the graph may appear to be before or after the anomaly).

The number of anomalies can be set using the `threshold` parameter manually or by setting `chi2`, which specifies the $\chi^2_{chi2}$ used in confidence interval. With GLRT, there is also a `window` prameter. The window size determines the context in which anomalies will be detected.

### 4. Multiscale Analysis (Wavelet Analysis)

This method uses the [Wavelet transform](https://en.wikipedia.org/wiki/Wavelet_transform). **The wavelet transform** decomposes the signal into a predetermined number of components, separating **the details** (high frequencies, i.e. potential anomalies) separately. Anomalies can be searched for individually in these parts. First the decomposition into details is performed, then each detail is separately reconstructed to its original size and anomalies are searched for based on a specified threshold. **If an potentional anomaly has been detected at the same spot in different detail componets it is marked as an anomaly.**

In anomaly detection with wavelet analysis are used 3 parameters. The first is the `wavelet_function` used (db4 by default), the second parameter is the `level`, this determines how much detail the data is decomposed into. Finally there is again the `std_multiple` parameter, which specifies the multiple of the standard deviation that will be treated as the threshold.

In [102]:
# Definitions of help functions and classes

# Some methods have an extra parameter std_multiple. Unless a threshold is set implicitly, 
# it is determined automatically by default based on the standard deviation. std_multiple 
# specifies a multiple of this deviation to use as the threshold. The larger the std_multiple 
# the fewer anomalies will be detected.

def load_series(input_file_path:str, aggregation_rate:str, aggregation_level:str, data_id=int) -> pd.DataFrame:
    # Load times
    time_ids = pd.read_csv(f"{input_file_path}/times/times_{aggregation_rate[4:]}.csv")
    time_ids['time'] = pd.to_datetime(time_ids['time']) # convert time to datetime
    time_map = dict(zip(time_ids['id_time'], time_ids['time']))

    # Load data
    df = pd.read_csv(f"{input_file_path}/{aggregation_level}/{aggregation_rate}/{str(data_id)}.csv")
    df = fill_missing(df, time_ids['id_time'])
    df['time'] = df['id_time'].map(time_map)
    return df


def fill_missing(train_df, train_time_ids):
    df_missing = pd.DataFrame(columns=train_df.columns)
    df_missing.id_time = train_time_ids[~train_time_ids.isin(train_df.id_time)].values

    for column in train_df.columns:
        if column == "id_time":
            continue
        if column in ["tcp_udp_ratio_packets","tcp_udp_ratio_bytes","dir_ratio_packets","dir_ratio_bytes"]:
            df_missing[column] = 0.5
        else:
            df_missing[column] = 0 # train_df[column].mean()

    return pd.concat([train_df, df_missing]).sort_values(by="id_time").reset_index()[train_df.columns]

In [103]:
# Kalman filter

def create_kalman_filter(dim_x=1, dim_z=1):
    kf = KalmanFilter(dim_x=dim_x, dim_z=dim_z)
    
    kf.x = np.zeros((dim_x, 1))  # Initialization
    kf.F = np.eye(dim_x)  # Assume that the state does not change (identity matrix)
    
    # Merania
    kf.H = np.eye(dim_z)  # Measuring matrix
    kf.P *= 1000  # Initial uncertainty
    kf.R = np.eye(dim_z) * 0.5  # Measurement uncertainty
    kf.Q = np.eye(dim_x) * 0.01  # Noise
    
    return kf


def get_residuals(kf:KalmanFilter, data:pd.Series):
    filtered_values = []
    for value in data:
        kf.predict()
        kf.update(value)
        filtered_values.append(kf.x[0, 0])
    return pd.DataFrame({"data": data, "kalman": filtered_values, "residual": (data - filtered_values)})


def variance_anomaly_detection(data:pd.DataFrame, threshold:float=None, std_multiple:float=3):
    if threshold is None:
        mean = np.mean(data['residual'])
        sigma = np.std(data['residual'])
        threshold = mean + std_multiple * sigma

    data['anomaly_variance'] = data['residual'].abs() > threshold
    return data


def CUSUM_anomaly_detection(data:pd.DataFrame, threshold:float|None=None, drift:float|None=None, std_multiple:float=4):
    residuals = data['residual'].values
    mean = np.mean(residuals)
    sigma = np.std(residuals)

    if drift is None:
        drift = 0.5 * sigma
    
    if threshold is None:
        threshold = std_multiple * sigma

    pos_cusum = 0
    neg_cusum = 0
    anomalies = np.zeros_like(residuals, dtype=int)

    for i in range(1, len(residuals)):
        pos_cusum = max(0, residuals[i] - (mean + drift) + pos_cusum)
        neg_cusum = max(0, (mean - drift) - residuals[i] + neg_cusum)

        if pos_cusum > threshold or neg_cusum > threshold:
            anomalies[i] = 1
            pos_cusum = 0
            neg_cusum = 0

    data['anomaly_cusum'] = anomalies
    return data


def GLRT_anomaly_detection(data:pd.DataFrame, window_size:int=50, threshold:float|None=None, chi2:float=0.99):
    residuals = data['residual'].values
    sigma = np.std(residuals) 

    if threshold is None:
        threshold = stats.chi2.ppf(chi2, df=1)

    
    anomalies = np.zeros_like(residuals, dtype=int)

    for i in range(len(residuals) - window_size + 1):
        window = residuals[i:i + window_size]
        mean = np.mean(window)

        # Compute the log-likelihood ratio test statistic
        # Null hypothesis (H0): mean = 0, variance = sigma^2
        # Alternative hypothesis (H1): mean = µ variance = sigma^2
        log_likelihood_h0 = -0.5 * np.sum((window / sigma) ** 2)
        log_likelihood_h1 = -0.5 * np.sum(((window - mean) / sigma) ** 2)

        glrt_statistic = 2 * (log_likelihood_h1 - log_likelihood_h0)

        if glrt_statistic > threshold:
            anomalies[i] = 1

    data['anomaly_glrt'] = anomalies
    return data


def wavelet_anomaly_detection(data:pd.DataFrame, wavelet_function:str="db4", level:int=4, std_multiple:float=3):
    residuals = data['residual'].values

    coeffs = pywt.wavedec(residuals, wavelet_function, level=level)
    detail_coeffs = coeffs[1:]
    
    #plot_wavelet_coefs(detail_coeffs)
    
    anomaly_flags = np.zeros((len(detail_coeffs), len(residuals)), dtype=int)
    anomalies = np.zeros_like(residuals, dtype=int)

    # For each level of details reconstruct only with detail and use variance for anomaly detection
    for i in range(1, level + 1):
        coeffs_copy = [np.zeros_like(c) if j != i else coeffs[j] for j, c in enumerate(coeffs)]
        detail_reconstructed = pywt.waverec(coeffs_copy, wavelet_function)

        mean = np.mean(detail_reconstructed)
        sigma = np.std(detail_reconstructed)
        threshold = mean + std_multiple * sigma

        anomaly_flags[i-1] = np.abs(detail_reconstructed) > threshold

        
    total_flags = np.sum(anomaly_flags, axis=0)
    anomalies = total_flags >= (level // 2)

    data['anomaly_wavelet'] = anomalies
    return data


def plot_wavelet_coefs(coefs):
    fig, axarr = plt.subplots(nrows=4, figsize=(10, 10))
    axarr[0].plot(coefs[3], 'g')
    axarr[0].set_title('Detail Coefficients - Level 4')
    axarr[1].plot(coefs[2], 'b')
    axarr[1].set_title('Detail Coefficients - Level 3')
    axarr[2].plot(coefs[1], 'm')
    axarr[2].set_title('Detail Coefficients - Level 2')
    axarr[3].plot(coefs[0], 'k')
    axarr[3].set_title('Detail Coefficients - Level 1')
    plt.tight_layout()
    plt.show()


def show_bar_graphs(anomaly_dfs:dict[str, pd.DataFrame]):
    all_features = []
    feature_traces = {}
    traces_cnt = 0
    initial_feature = None
    fig = go.Figure()
    for feature, df in anomaly_dfs.items():
        if initial_feature is None:
            initial_feature = feature
            
        all_features.append(feature)
        feature_traces[feature] = 2
        traces_cnt += 2

        fig.add_trace(go.Bar(
            x=df['time'],
            y=df['data'],
            name=feature,
            marker=dict(line=dict(width=0)),
            visible=(feature == initial_feature),
        ))

        fig.add_trace(go.Scatter(
            x=df['time'],
            y=df['kalman'],
            mode='lines',
            name=f'{feature} - kalman',
            marker=dict(line=dict(width=0)),
            visible=(feature == initial_feature),
        ))

        # Add anomaly points
        for anomaly in ['anomaly_variance', 'anomaly_cusum', 'anomaly_glrt' , 'anomaly_wavelet']:
            if anomaly in df.keys():
                feature_traces[feature] += 1
                traces_cnt += 1
                fig.add_trace(go.Scatter(
                    x=df.loc[df[anomaly] == 1, 'time'],
                    y=df.loc[df[anomaly] == 1, 'data'],
                    mode='markers',
                    marker=dict(size=8),
                    name=f'{anomaly} - {feature}',
                    
                    visible=(feature == initial_feature)
                ))
        
    # Create dropdown buttons
    buttons = []
    feature_ploted = 0
    for i, feature in enumerate(all_features):
        visibility = [False] * traces_cnt
        visibility[feature_ploted : (feature_ploted + feature_traces[feature])] = [True] * feature_traces[feature]
        feature_ploted += feature_traces[feature]
        
        buttons.append(dict(
            label=feature,
            method='update',
            args=[{'visible': visibility}, {'title': f"Results for {feature}"}]
    ))
    # Update layout with dropdown
    fig.update_layout(
        title=f'Results for {initial_feature}',
        updatemenus=[{
            "buttons": buttons,
            "direction": "down",
            "showactive": True,
        }],
        xaxis_title="Time",
        yaxis_title="Value"
    )    
    fig.show()

## Subnet 36

In [104]:
AGG_RATE = "agg_1_hour"
AGG_LEVEL = "institution_subnets" # can be institutions or institution_subnets or ip_addresses_sample
DATA_ID = 36

df:pd.DataFrame = load_series(INPUT_FILE_PATH, AGG_RATE, AGG_LEVEL, DATA_ID)

features = ["n_packets", "n_bytes", "std_n_dest_ip", "sum_n_dest_ports", "dir_ratio_packets", "avg_duration"]

anomaly_dfs = {}
for feature in features:
    kf = create_kalman_filter()
    anomalies_df = get_residuals(kf, df[feature])
    anomalies_df['time'] = df['time']
    anomalies_df = variance_anomaly_detection(anomalies_df, std_multiple=6)
    anomalies_df = CUSUM_anomaly_detection(anomalies_df, std_multiple=10)
    anomalies_df = GLRT_anomaly_detection(anomalies_df, chi2=0.999)
    anomalies_df = wavelet_anomaly_detection(anomalies_df, std_multiple=7)
    anomaly_dfs[feature] = anomalies_df


show_bar_graphs(anomaly_dfs)


## IP 1367

In [105]:
AGG_RATE = "agg_1_hour"
AGG_LEVEL = "ip_addresses_sample" # can be institutions or institution_subnets or ip_addresses_sample
DATA_ID = 1367

df:pd.DataFrame = load_series(INPUT_FILE_PATH, AGG_RATE, AGG_LEVEL, DATA_ID)

features = ["n_packets", "n_bytes", "std_n_dest_ip", "sum_n_dest_ports", "dir_ratio_packets", "avg_duration"]

anomaly_dfs = {}
for feature in features:
    kf = create_kalman_filter()
    anomalies_df = get_residuals(kf, df[feature])
    anomalies_df['time'] = df['time']
    anomalies_df = variance_anomaly_detection(anomalies_df, std_multiple=5)
    anomalies_df = CUSUM_anomaly_detection(anomalies_df, std_multiple=10)
    anomalies_df = GLRT_anomaly_detection(anomalies_df, window_size=200, chi2=0.999)
    anomalies_df = wavelet_anomaly_detection(anomalies_df, std_multiple=6)
    anomaly_dfs[feature] = anomalies_df


show_bar_graphs(anomaly_dfs)