In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import numpy as np 
import pandas as pd 
import itertools 
from sklearn.metrics import mean_squared_error as mse
from sklearn.ensemble import IoslationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from statsmodels.tsa.api import ExponentialSmoothing 
import statsmodels.api as sm 
from fbprophet import Prophet 


In [None]:
gp_anomaliesOutput= loaded data
gp_traindata=loaded data


In [None]:
def gp_anomalies(gp_traindata):
    max_dt = gp_traindata.select('dt_period_end')\
    .agg(F.max('dt_period_end'))
    .collect()[0][0]
    
    in_cols = ['starting_base', 'actual', 'is_anomaly', 'gp_prediction', 'excess']
    metric='vol'
    feats = ['dt_period_end', 'dat_of_week', metric, metric+'_weight']
    
    schema_cols = ['starting_base', 'actual', 'is_anomaly', 'gp_prediction', 'excess']
    run_model_udf = F.udf(lambda x: run_model(x, metric, max_dt))
    
    anomaly_df = gp_traindata.withColumn('features', F.struct(feats + ['starting_base'])) \
                            .select(in_cols) \
                            .groupBy(in_cols[:-1])\
                            .agg(F.collect_list('features').alias('collected_features'))\
                            .withColumn('anomaly_struct', run_model_udf())
    
    return anomaly_df
    

In [None]:
def run_model(X, metric, max_dt):
    cols = ['dt_period_end', 'day_of_week', metric, metric+'_weight', 'starting_base']
    X_train = pd.DataFrame(X, columns=cols)
    X_train.sort_values('dt_period_end', inplace=True)
    if len(X_train) == 0:
        return '0, 0, 0, 0, 0'
    X_train.fillna(0, inplace=True)
    X_train[metric] = X_train[metric].map(lambda x: 1 if x <= 0 else x)
    print(X_train.columns)
    try:
        actual = X_train[X_train['dt_period_end'] == max_dt][metric].iloc[0]
        starting_base = X_train[X_train['dt_period_end']==max_dt]['starting_base'].iloc[0]
    except:
        actual, starting_base = (0,0)
    if starting_base < 1000:
        return ', '.join(map(str, [starting_base, actual])) + ', 0, 0, 0'
    X_train.drop('starting_base', axis=1, inplace=True)
    
    
    ### model functional call dict
    total_models = len(clfs)
    model_vote_sum = 0
    model_forecast_pos = []
    model_forecast_neg = []
    
    model_func_dict = {"IF": gp_model_if, 'svm':gp_model_onesvm, 'LOF':gp_model_lof,
                       'HWES': gp_model_hwes, 'SARIMAX': gp_model_sarima, 'FBPROP':gp_model_fbprophet}
    
    for clf in clfs:
        anomaly, forecast = model_func_dict[clf](X_train, metric, max_dt)
        model_vote_sum += int(anomaly)
        if anomaly:
            model_forecast_pos.append(forecast)
        else:
            model_forecast_neg.append(forecast)
    mean_pos = np.mean(model_forecast_pos)
    mean_neg = np.mean(model_forecast_neg)
    if model_vote_sum > int(total_models/2):
        return ', '.join(map(str, [starting_base, actual, 1, mean_pos, actual-mean_pos]))
    return ', '.join(map(str, [starting_base, actual, 0, mean_neg, actual - mean_neg]))


    

In [None]:
def gp_model_fbprophet(df, metric, max_dt):
    df.sort_values(by=['dt_period_end'], inplace=True, ascending=False)
    df = df[['dt_period_end', metric]][:-1]
    df.columns = ['ds', 'y']
    # instantiate model
    m = Prophet(daily_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative')
    m.fit(df)
    
    future = m.make_future_dataframe(periods=1, freq='D')
    forecast = m.predict(future)
    
    forecast['Actual'] = df['y'].reset_index(drop=True)
    
    forecasted = forecast[['ds', 'yhat', 'Actual', 'yhat_lower', 'yhat_upper']].copy()
    
    forecasted['errors'] = forecasted['Actual'] - forecasted['yhat']
    
    forecasted['ambiguity'] = forecasted['yhat_upper'] - forecasted['yhat_lower']
    
    forecasted[forecasted['errors']>1.5*forecasted['ambiguity']]
    forecasted['anomaly'] = forecasted.apply(lambda x:1 if (x['errors'] > 1.5*x['ambiguity'] else 0, axis=1))
    
    # importance 
    ##
    ##
    try:
        return forecasted[forecasted['ds']==max_dt]['anomaly'].iloc[0] ==1, forecasted[forecasted['ds']==max_dt]['yhat'].iloc[-1]
    except:
        return False

In [None]:
def gp_model_lof(df, metric, max_dt):
    X_train = pd.concat([df, pd.get_dummies(df['day_of_week'], prefix='day')], axis=1)
    if_features = features + [metric]
    
    model = LocalOutlierFacotr(contamination='auto', n_neighbors=20, novely=False)
    
    df['anomaly'] = pd.Series(model.fit_predict(X_train[if_features]))
    
    df['anomaly'].replace({1:0, -1:1}, inplace=True)
    
    try:
        return df[df['dt_period_end']==max_dt]['anomaly'].iloc[0] == 1
    except:
        return False