Skip to content

Commit

Permalink
#113 Anomaly detection function using Hampel Filter was implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim55667757 committed Dec 25, 2022
1 parent 12bfff9 commit 65afba0
Showing 1 changed file with 57 additions and 3 deletions.
60 changes: 57 additions & 3 deletions tksbrokerapi/TradeRoutines.py
Expand Up @@ -29,6 +29,7 @@
from datetime import datetime, timedelta
from dateutil.tz import tzutc
import pandas as pd
from typing import Union, Optional


# --- Main constants:
Expand Down Expand Up @@ -276,22 +277,24 @@ def HampelFilter(pdSeries: pd.Series, window: int = 5, sigma: float = 3, scaleFa
e.g. for Gaussian distribution it is approximately 1.4826.
References:
1. Lewinson Eryk, "Outlier Detection with Hampel Filter", September 26, 2019.
Link: https://towardsdatascience.com/outlier-detection-with-hampel-filter-85ddf523c73d
2. Liu, Hancong, Sirish Shah, and Wei Jiang, "On-line outlier detection and data cleaning". Computers and Chemical Engineering. Vol. 28, March 2004, pp. 1635–1647.
Link: https://sites.ualberta.ca/~slshah/files/on_line_outlier_det.pdf
3. Hampel F. R., "The influence curve and its role in robust estimation". Journal of the American Statistical Association, 69, 382–393, 1974.
Examples:
- `HampelFilter(pdSeries=pd.Series([1, 1, 1, 1, 1, 1]), window=3) -> pd.Series([False, False, False, False, False, False])`
- `HampelFilter(pdSeries=pd.Series([1, 1, 1, 2, 1, 1]), window=3) -> pd.Series([False, False, False, True, False, False])`
- `HampelFilter(pdSeries=pd.Series([0, 1, 1, 1, 1, 0]), window=3) -> pd.Series([True, False, False, False, False, True])`
- `HampelFilter(pdSeries=pd.Series([1])) -> pd.Series([False])`
:param pdSeries: Pandas Series object with numbers in which we identify outliers.
:param window: length of the sliding window (default: 5 points), 1 <= window <= len(pdSeries).
:param sigma: sigma is the number of standard deviations which identify the outlier (default: 3 sigmas), > 0.
:param scaleFactor: constant scale factor (default: 1.4826), > 0.
:param window: length of the sliding window (5 points by default), 1 <= window <= len(pdSeries).
:param sigma: sigma is the number of standard deviations which identify the outlier (3 sigma by default), > 0.
:param scaleFactor: constant scale factor (1.4826 by default), > 0.
:return: Pandas Series object with True/False values. `True` mean that an outlier detected in that position of input series.
If an error occurred then empty series returned.
"""
Expand Down Expand Up @@ -330,3 +333,54 @@ def HampelFilter(pdSeries: pd.Series, window: int = 5, sigma: float = 3, scaleFa
new = pd.Series()

return new


def HampelAnomalyDetection(series: Union[list, pd.Series], **kwargs) -> Optional[int]:
"""
Anomaly Detection function using Hampel Filter. This function returns the minimum index of elements in anomaly list
or index of the first maximum element in input series if this index less than anomaly element index. If series has
no anomalies then None will be return.
Examples:
- `HampelAnomalyDetection([1, 1, 1, 1, 1, 1]) -> None`
- `HampelAnomalyDetection([1, 1, 1, 1, 111, 1]) -> 4`
- `HampelAnomalyDetection([1, 1, 10, 1, 1, 1]) -> 2`
- `HampelAnomalyDetection([111, 1, 1, 1, 1, 1]) -> 0`
- `HampelAnomalyDetection([111, 1, 1, 1, 1, 111]) -> 0`
- `HampelAnomalyDetection([1, 11, 1, 111, 1, 1]) -> 1`
- `HampelAnomalyDetection([1, 1, 1, 111, 99, 11]) -> 3`
- `HampelAnomalyDetection([1, 1, 11, 111, 1, 1, 1, 11111]) -> 2`
- `HampelAnomalyDetection([1, 1, 1, 111, 111, 1, 1, 1, 1]) -> 3`
- `HampelAnomalyDetection([1, 1, 1, 1, 111, 1, 1, 11111, 5555]) -> 4`
- `HampelAnomalyDetection([9, 13, 12, 12, 13, 12, 12, 13, 12, 12, 13, 12, 12, 13, 12, 13, 12, 12, 1, 1]) -> 1`
- `HampelAnomalyDetection([9, 13, 12, 12, 13, 12, 1000, 13, 12, 12, 300000, 12, 12, 13, 12, 2000, 1, 1, 1, 1]) -> 6`
Some **kwargs parameters you can pass to `HampelFilter()`:
- `window` is the length of the sliding window (5 points by default), 1 <= window <= len(pdSeries).
- `sigma` is the number of standard deviations which identify the outlier (3 sigma by default), > 0.
- `scaleFactor` is the constant scale factor (1.4826 by default), > 0.
:param series: list of numbers or Pandas Series object with numbers in which we identify index of first anomaly (outlier's index).
:param kwargs: See `HampelFilter()` docstring with all possible parameters.
:return: index of the first element with anomaly in series will be return or `None` if no anomaly.
"""
try:
if isinstance(series, list):
series = pd.Series(series)

indexFirstMax = series.idxmax() # Index of the first maximum in series

filtered = HampelFilter(pdSeries=series, **kwargs) # The bool series with filtered data (if True then anomaly present in that place of input series)
anomalyIndexes = filtered[filtered == True].index # Indexes list of all found anomalies (if True)

indexAnomalyMin = min(anomalyIndexes) if len(anomalyIndexes) > 0 else None # Index of the first True in filtered series or None

# You need to take the element whose index is less (see examples in docstring):
result = min(indexAnomalyMin, indexFirstMax) if indexAnomalyMin is not None else None

except Exception:
result = None

return result

0 comments on commit 65afba0

Please sign in to comment.