In [1]:
import numpy as np
import pandas as pd
from hampel import hampel
import warnings

warnings.filterwarnings("ignore")

def median_absolute_deviation(x):
    """
    Returns the median absolute deviation from the window's median
    :param x: Values in the window
    :return: MAD
    """
    return np.median(np.abs(x - np.median(x)))


def hampel_legacy(ts, window_size=5, n=3, imputation=False):

    """
    Median absolute deviation (MAD) outlier in Time Series
    :param ts: a pandas Series object representing the timeseries
    :param window_size: total window size will be computed as 2*window_size + 1
    :param n: threshold, default is 3 (Pearson's rule)
    :param imputation: If set to False, then the algorithm will be used for outlier detection.
        If set to True, then the algorithm will also imput the outliers with the rolling median.
    :return: Returns the outlier indices if imputation=False and the corrected timeseries if imputation=True
    """

    if type(ts) != pd.Series:
        raise ValueError("Timeserie object must be of type pandas.Series.")

    if type(window_size) != int:
        raise ValueError("Window size must be of type integer.")
    else:
        if window_size <= 0:
            raise ValueError("Window size must be more than 0.")

    if type(n) != int:
        raise ValueError("Window size must be of type integer.")
    else:
        if n < 0:
            raise ValueError("Window size must be equal or more than 0.")

    # Copy the Series object. This will be the cleaned timeserie
    ts_cleaned = ts.copy()

    # Constant scale factor, which depends on the distribution
    # In this case, we assume normal distribution
    k = 1.4826

    rolling_ts = ts_cleaned.rolling(window_size*2, center=True)
    rolling_median = rolling_ts.median().fillna(method='bfill').fillna(method='ffill')
    rolling_sigma = k*(rolling_ts.apply(median_absolute_deviation).fillna(method='bfill').fillna(method='ffill'))

    outlier_indices = list(
        np.array(np.where(np.abs(ts_cleaned - rolling_median) >= (n * rolling_sigma))).flatten())

    if imputation:
        ts_cleaned[outlier_indices] = rolling_median[outlier_indices]
        return ts_cleaned

    return outlier_indices

# Synthetic Data

In [2]:
data = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)

# Add outliers to the original data
for index, value in zip([20, 40, 60, 80], [2.0, -1.4, 2.1, -0.5]):
    data[index] = value

data = pd.Series(data)

# Previous Implementation

In [3]:
%%timeit
hampel_legacy(data, imputation=True)

6.54 ms ± 40.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [6]:
hampel_legacy(data, imputation=True)

0    -0.045298
1     0.096230
2     0.229156
3     0.307104
4     0.371425
        ...   
95   -0.279893
96   -0.277427
97   -0.363055
98   -0.545942
99   -0.527614
Length: 100, dtype: float64

In [8]:
hampel_legacy(data)

[20, 60, 80]

# New implementation

In [4]:
%%timeit
hampel(data)

1.83 ms ± 9.33 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [5]:
hampel(data).filtered_data

0    -0.045298
1     0.096230
2     0.229156
3     0.307104
4     0.371425
        ...   
95   -0.279893
96   -0.277427
97   -0.363055
98   -0.545942
99   -0.527614
Length: 100, dtype: float32

In [7]:
hampel(data).outlier_indices

array([40, 60, 76, 80, 83], dtype=int32)