In [13]:
from darts.utils.statistics import stationarity_tests
from darts.timeseries import TimeSeries
from statsmodels.tsa.seasonal import STL
import prophet as Prophet
import pandas as pd
import numpy as np
from scipy.stats import median_abs_deviation

In [3]:
class TimeSeriesStats:

    def __init__(self, time_series):
        """Initialize the time series

        Args:
            time_series (pandas Series): valid time series sequence without dt or ds
        """
        self.ts = TimeSeries.from_series(time_series)

    def check_stationarity(self):
        """Checks if the time series data is stationary or not

        Returns:
            stationarity (bool): True - stationary, False - non-stationary
        """
        stationary = stationarity_tests(self.ts, p_value_threshold_adfuller=0.05, p_value_threshold_kpss=0.05)
        return stationary

    def check_seasonality(self):
        """Checks if the non-stationary time series data has seasonality or not

        Returns:
    
            seasonality (bool): True - Seasonal, False - not seasonal
            seasonal_period (int): If True, value is the period of seasonality
            
        """
        seasonality, seasoanal_period = check_seasonality(self.ts)
        return seasonality, seasoanal_period

    def get_residuals(self, ts, seasonality_results):
        """Decomposes Residuals from a time series data after performing STL decomposition

        Args:
            ts (pandas series): Time series to decompose
            seasonality_results (tuple): A tuple of seasonality(Boolean) and period of seasonality(int)
        returns:
            resid (pandas series): residuals after decomposition
        """
        seasonality, period = seasonality_result
        if seasonality:
            stl = STL(ts[ts.columns[1]], period = period)
        else:
            stl = STL(ts.columns[1], period = None)
        result = stl.fit()
        return result.resid
        
        


In [4]:
from abc import ABC, abstractmethod 

class AnomalyDetector(ABC):

    @abstractmethod
    def fit():
        pass

    @abstractmethod
    def predict():
        pass
        
    


In [10]:
class ProphetForAnomalyDetector(AnomalyDetector):

    def __init__(self, time_series):
        """Initialize the time series

        Args:
            time_series (pandas Series): valid time series sequence without dt or ds
        """
        self.ts = time_series.copy()
        if self.ts.columns != 2:
            raise ValueError("Need two columns, in the dataframe, one with date and another with kpi")
        self.kpi_name = self.ts.columns[1]
        self.date_name = self.ts.columns[0]
        self.model = None

    def transform(self):
        """Renames columns to ds & y, and transforms the date_time field to pandas date series columns as requested by prophet

        Args:
            None
        """
        if 'ds' not in self.ts.columns:
            self.ts = self.ts.rename(columns={self.date_name: "ds"})
        if 'y' not in self.ts.columns:
            self.ts = self.ts.rename(columns = {self.kpi_name: "y"})
        if not pd.api.types.is_datetime64_any_dtype(self.ts['ds']):
            self.ts['ds'] =  pd.to_datetime(self.ts['ds'], format='%Y-%m-%d', errors='raise')
        
    def fit(self, interval_width = 0.99, changepoint_range = 1):
        """Fits the model to the time series data

        Args:
            interval_width (float): a float number between 0 to 1, defines the uncertainity or confidence interval
            changepoint_range (float): a float number between 0 to 1, denotes the percentage of data from start to be used for changepoint detection

        Returns:
            self.model (Prophet): Trained model for deep analysis outside the class (by default predict method takes self.model)
        """
        self.model = Prophet.Prophet(changepoint_range = changepoint_range, interval_width = interval_width)
        self.model = m.fit(self.ts)
        return self.model
               
    def predict(self):
        """Predicts the confidence interval range and detects anomalies in a dataset

        Args:
            None

        Returns:
            Anomalies (pandas dataframe): the initially sent time series dataframe with additionally added anomaly series for each record 
        """
        if self.model:
            forecast = self.model.predict(self.ts)
            forecast['fact'] = self.ts['y'].reset_index(drop = True)
            forecasted = forecast[['ds','trend', 'yhat', 'yhat_lower', 'yhat_upper', 'fact']].copy()
            forecasted['anomaly'] = 0
            forecasted.loc[forecasted['fact'] > forecasted['yhat_upper'], 'anomaly'] = 1
            forecasted.loc[forecasted['fact'] < forecasted['yhat_lower'], 'anomaly'] = -1
            anomalies = forecasted[['ds', 'y', 'anomaly']].rename({'ds':self.date_name, 'y':self.kpi_name})
            return anomalies
        else:
            raise ValueError("self.model is None, Check if model fit is success")



In [14]:
class IQRForAnomalyDetector(AnomalyDetector):
    

    def __init__(self, time_series):
        """Initialize the time series

        Args:
            time_series (pandas Series): valid time series sequence without dt or ds
        """
        self.ts = time_series.copy()
        if self.ts.columns != 2:
            raise ValueError("Need two columns, in the dataframe, one with date and another with kpi")
        self.kpi_name = self.ts.columns[1]
        self.date_name = self.ts.columns[0]
        self.stationarity = False
        self.limits = None
        self.resid = None

    def get_limits(self, time_series, threshold):
        """Calculates lower and upper limit for Anomaly detection using IQR method

        Args:
            time_series (pandas series): 
            threshold (float): threshold multiplier with IQR, defaults to 1.5

        Returns:
            low_lim (float): lower limit for the IQR
            up_lim (float): upper limit for the IQR
        """
        
        Q1 = time_series[self.kpi_name].quantile(0.25)
        Q3 = time_series[self.kpi_name].quantile(0.75)
        IQR = Q3 - Q1
        low_lim = Q1 - threshold * IQR
        up_lim = Q3 + threshold * IQR
        return low_lim, up_lim
        
    def fit(self, threshold = 1.5):
        """Fits the time series data, checks stationarity and computes limits for IQR as required

        Args:
            threshold: threshold multiplier with IQR, defaults to 1.5

        Returns:
            self.limits (tuple): (low_lim, up_lim) for IQR
        """
        darts_time_series = TimeSeriesStats(self.ts)
        stationarity = darts_time_series.check_stationarity()
        if stationarity:
            low_lim, up_lim = self.get_limits(self.ts, threshold)
        else:
            seasonality_results = darts_time_series.check_seasonality(self.ts)
            resid = darts_time_series.get_residuals(self.ts, seasonality_results)
            resid_df = pd.DataFrame({self.kpi_name: resid})
            resid_df.index = self.ts.index
            self.resid = resid_df
            self.limits = self.get_limits(self.resid ,threshold)
        return self.limits

    
    def predict(self):
        """Predicts the anomalies based on the lower limit and upper limit for IQR range from time series or residuals

        Args:
            None

        Returns:
            Anomalies (pandas dataframe): the initially sent time series dataframe with additionally added anomaly series for each record
        """
        self.ts['anomaly'] = 0
        if self.limits is None:
            raise ValueError("Upper and lower limits for IQR is not decided yet")
        if self.resid is not None:
            self.resid['anomaly'] = 0
            self.resid['anomaly'].loc[self.resid['anomaly'].lt(self.limits[0]), 'anomaly'] = -1
            self.resid['anomaly'].loc[self.resid['anomaly'].gt(self.limits[1]), 'anomaly'] = 1
            self.ts['anomaly'] = self.resid['anomaly']
        else:
            self.ts['anomaly'].loc[self.ts['anomaly'].lt(self.limits[0]), 'anomaly'] = -1
            self.ts['anomaly'].loc[self.ts['anomaly'].gt(self.limits[1]), 'anomaly'] = 1
        anomalies = self.ts
        return anomalies


class RobustZForAnomalyDetector:

    def __init__(self, time_series):
        """Initialize the time series

        Args:
            time_series (pandas Series): valid time series sequence without dt or ds
        """
        self.ts = time_series.copy()
        if self.ts.columns != 2:
            raise ValueError("Need two columns, in the dataframe, one with date and another with kpi")
        self.kpi_name = self.ts.columns[1]
        self.date_name = self.ts.columns[0]
        self.stationarity = False
        self.z_thresh = None
        self.mad = None
        self.median = None

    def robust_z_score(self, time_series_value):
        """Calculate robust z score based on the formula

        Args:
            time_series_value (float): value from a series for which z-score to be calculated

        Returns:
            z_score (float): score based on the formula
        """
        z_score = 0.6745 * (x - self.median)/self.mad
        return z_score
    
    def fit(self, threshold = 3.5):
        """Fits the time series data and calculates the z-score

        Args:
            threshold (float): score threshold for anomaly, defaults to 3.5

        Returns:
            time_series_z_score (pandas series): the z-score calculated for all rows
        """
        self.threshold = threshold
        self.mad = median_abs_deviation(self.ts[self.kpi_name])
        self.median = np.median(self.ts[self.kpi_name])
        if self.mad == 0:
            self.mad = 0.1
        self.ts['z_score'] = self.ts[self.kpi_name].apply(self.robust_z_score)
        return self.ts['z_score']
        

    def predict(self):
        """Predicts based on the threshold value for z-score, if the record is anomaly or not

        Args:
            None
        """
        self.ts['anomaly'] = 0
        self.ts.loc[self.ts['z_score']>=self.threshold, 'anomaly'] = 1
        self.ts.loc[self.ts['z_score']<=-self.threshold, 'anomaly'] = -1
        anomalies = self.ts[[self.date_name, self.kpi_name, 'anomaly']]
        return anomalies
        
        


