diff --git a/tksbrokerapi/TradeRoutines.py b/tksbrokerapi/TradeRoutines.py index b46f062..203d27b 100644 --- a/tksbrokerapi/TradeRoutines.py +++ b/tksbrokerapi/TradeRoutines.py @@ -28,6 +28,7 @@ from datetime import datetime, timedelta from dateutil.tz import tzutc +import pandas as pd # --- Main constants: @@ -262,3 +263,70 @@ def CalculateLotsForDeal(currentPrice: float, maxCost: float, volumeInLot: int = lots = 0 return lots + + +def HampelFilter(pdSeries: pd.Series, window: int = 5, sigma: float = 3, scaleFactor: float = 1.4826) -> pd.Series: + """ + Outlier Detection with Hampel Filter. It can detect outliers based on a sliding window and counting difference + between median values and input values of series. The Hampel filter is often considered extremely effective in practice. + + For each window we calculate the median and the standard deviation expressed as the median absolute deviation (MAD). + If the considered observation differs from the window median by more than x standard deviations, we treat it + as an outlier. Also used a constant scale factor which is dependent on the series distribution, + e.g. for Gaussian distribution it is approximately 1.4826. + + References: + 1. Lewinson Eryk, "Outlier Detection with Hampel Filter", September 26, 2019. + Link: https://towardsdatascience.com/outlier-detection-with-hampel-filter-85ddf523c73d + 2. Liu, Hancong, Sirish Shah, and Wei Jiang, "On-line outlier detection and data cleaning". Computers and Chemical Engineering. Vol. 28, March 2004, pp. 1635–1647. + Link: https://sites.ualberta.ca/~slshah/files/on_line_outlier_det.pdf + 3. Hampel F. R., "The influence curve and its role in robust estimation". Journal of the American Statistical Association, 69, 382–393, 1974. + + Examples: + - `HampelFilter(pdSeries=pd.Series([1, 1, 1, 1, 1, 1]), window=3) -> pd.Series([False, False, False, False, False, False])` + - `HampelFilter(pdSeries=pd.Series([1, 1, 1, 2, 1, 1]), window=3) -> pd.Series([False, False, False, True, False, False])` + - `HampelFilter(pdSeries=pd.Series([0, 1, 1, 1, 1, 0]), window=3) -> pd.Series([True, False, False, False, False, True])` + - `HampelFilter(pdSeries=pd.Series([1])) -> pd.Series([False])` + + :param pdSeries: Pandas Series object with numbers in which we identify outliers. + :param window: length of the sliding window (default: 5 points), 1 <= window <= len(pdSeries). + :param sigma: sigma is the number of standard deviations which identify the outlier (default: 3 sigmas), > 0. + :param scaleFactor: constant scale factor (default: 1.4826), > 0. + :return: Pandas Series object with True/False values. `True` mean that an outlier detected in that position of input series. + If an error occurred then empty series returned. + """ + try: + if window < 1: + window = 1 + + if window > len(pdSeries): + window = len(pdSeries) + + if sigma <= 0: + sigma = 3 + + if scaleFactor <= 0: + scaleFactor = 1.4826 + + # Extend data to avoid undetected anomaly in first and last elements by original algorithm: + pdSeries = pd.concat([pdSeries[: window], pdSeries, pdSeries[-window:]]) + + rollingMedian = pdSeries.rolling( + window=2 * window, + center=True, + ).median() + + rollingMAD = scaleFactor * pdSeries.rolling( + window=2 * window, + center=True, + ).apply(lambda x: pd.Series.median(pd.Series.abs(x - pd.Series.median(x)))) + + delta = pd.Series.abs(pdSeries - rollingMedian) + + new = delta > sigma * rollingMAD + new = new[window: -window] + + except Exception: + new = pd.Series() + + return new