# Imports

In [None]:
data_loc = "../../../../data/" 

In [None]:
import prophet
import sys

sys.modules['fbprophet'] = prophet

In [None]:
import numpy as np
import pandas as pd

from collections import OrderedDict
import itertools
from datetime import datetime

import sys
sys.path.append("..")

from copy import deepcopy

from configs.kpi_constraints_dict import kpi_constraints_dict
from configs.bad_direction_kpi_dict import bad_direction_kpi_dict
from configs.functions import make_future, run_prophet_funct
from configs.functions import add_cond, mult_cond, is_weekday
from configs.functions import preprocess_data, hparam_tuning


import logging, sys, os
import time

from uuid import uuid4

import dask
from dask.distributed import Client
from dask.distributed import TimeoutError, CancelledError
from sklearn.preprocessing import StandardScaler
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

flatten = lambda t: [item for sublist in t for item in sublist]

# Dask client init

In [None]:
client = Client( dashboard_address = ':44594', n_workers = 68, threads_per_worker = 2 ) #scheduler_address=':37243'
client

# Data

In [None]:
file = "4weeks-subs_mcc-anon.csv"#"4weeks-subs_mcc-anon.csv"
datas = pd.read_csv(data_loc + file) #pd.read_csv( "../Data/" + file )
print("-"*30,"DF READ","-"*30)

In [None]:
metadata_store = pd.read_csv(data_loc + "metadata-anon.csv")#pd.read_csv('../Data/metadata_anon.csv')
# Mixed datatypes in the dimension_name col: floats and OrderedDict as str (need eval)

# Get rid of all irrelevant metadata
metadata_store = metadata_store[ metadata_store.model_type == 'seasonal_prophet' ]
metadata_store = metadata_store[ metadata_store.path == file ]

# evaluate str to OrderedDict
metadata_store.dimension_name = metadata_store.dimension_name.map(str).map(lambda element: eval(element))

# Parameters

In [None]:
missing_data_percentage_param = 0.3
DAILY_FOURIER_ORDER = 3
WEEKLY_FOURIER_ORDER = 5
COUNTRY_NAME = 'USA'
percent = 0.1
scores = ['mae'] #['mdape', 'mape', 'smape', 'mae']
predictions_write_to = '' 
errors_write_to = ''
write_to = 'prophet_results2/'

In [None]:
end = pd.to_datetime(metadata_store['ts'].values[0], unit='s')
ts = metadata_store['ts'].values[0]
start = end - pd.Timedelta(4, unit = 'w')

# Hyperparameter grid

In [None]:
param_grid =    {  
                'changepoint_prior_scale': [0.01, 0.1, 1.0],
                'seasonality_prior_scale': [0.1, 1.0, 10.0, 50],
                'seasonality_mode' : ['additive', 'multiplicative'],
                }

all_params = [dict(zip(param_grid.keys(), v)) for v in itertools.product(*param_grid.values())]
all_params2 = [dict(zip(list(param_grid.keys())+['weekend'], v)) for v in itertools.product(*list(param_grid.values())+[[True, False]])]

In [None]:
p_uuid_df = pd.DataFrame( { str(uuid4()): p  for p in all_params2}).T   # p_uuid: 1 id per parameter combination  for tracking - same for all models with the same hparams, regardless of timeseries

p_uuid_df.iloc[0].to_dict()
for idx, row in p_uuid_df.head().iterrows():
    print(idx, row.to_dict())

In [None]:
p_uuid_df.head()

# Functions

In [None]:
def submit_training(df, row, p_uuid_df):
    """
    Submit every model for training in dask.

    df - timeseries dataframe in prophet ready format
    row - metadata row
    p_uuid_df - dataframe containing all parameter-dict and id
    """
    df_p_dict = {} # store and return performance metrics of models
    
    # submit tasks to the client
    for p_uuid, params_row in p_uuid_df.iterrows(): # we use all_params2 here, which have the weekend param too
        
        params = params_row.to_dict()
        df_p = hparam_tuning(df, params, row,
                                is_weekend = True,
                                parallel = "dask",
                                daily_fourier_order=DAILY_FOURIER_ORDER,
                                weekly_fourier_order=WEEKLY_FOURIER_ORDER)
        df_p_dict[ p_uuid ] = df_p

        
    return df_p_dict
    

def resubmit_training(df, row, params_row):

    """
    Submit one model for training in dask
    """

    params = params_row.to_dict()

    df_p = hparam_tuning(df, params, row,
                            is_weekend = True,
                            parallel = "dask",
                            daily_fourier_order=DAILY_FOURIER_ORDER,
                            weekly_fourier_order=WEEKLY_FOURIER_ORDER)

    return df_p

In [None]:
def myForecast(df, tuning_results,
               row, score):

    kpi = row["kpi_name"]
    dim_dict = row["dimension_name"]

    multiplicative_condition = False
    additive_condition = False
    lower = kpi_constraints_dict[kpi][0]
    upper = kpi_constraints_dict[kpi][1]

    tuning_results[score+'_rank'] = tuning_results[score].rank()
    tuning_results['rank'] = tuning_results[score+'_rank']

    params = tuning_results.loc[ tuning_results["rank"].idxmin(), 
                                    [ "changepoint_prior_scale",
                                    "seasonality_prior_scale",
                                    "seasonality_mode",
                                    "weekend"] ].to_dict()
    weeknd = params['weekend']
    params.pop("weekend", None)

    if weeknd:
        m = run_prophet_funct(df, params, 
                                DAILY_FOURIER_ORDER, 
                                WEEKLY_FOURIER_ORDER,
                                True,
                                COUNTRY_NAME)
        future = make_future(m, end, 168)
        future['weekday'] = future['ds'].apply(is_weekday)
        future['weekend'] = ~future['ds'].apply(is_weekday)
        forecast = m.predict(future)
    
    else:
        m = run_prophet_funct(df, params, 
                                DAILY_FOURIER_ORDER, 
                                WEEKLY_FOURIER_ORDER, 
                                False, 
                                COUNTRY_NAME)
        future = make_future(m, end, 168)
        forecast = m.predict(future)
    
    if params['seasonality_mode'] == 'multiplicative':
        multiplicative_condition = mult_cond(forecast, df, percent, lower, upper, end)# ha mult_cond - > True, akkor NEM multiplikatív
        #ellenorizzuk a feltetelek teljesuleset
        if multiplicative_condition:
            
            tuning_results_additive = tuning_results.loc[tuning_results['seasonality_mode']=='additive']
            #majd kivalasztjuk a legjobbat
            params = tuning_results_additive[list(param_grid.keys())+['weekend']].loc[tuning_results_additive['rank'].idxmin()].to_dict()
            #es tanitunk, vot ma
            weeknd = params.pop('weekend')
            if weeknd:
                m = run_prophet_funct(df, params, 
                                        daily_fourier_order = DAILY_FOURIER_ORDER, 
                                        weekly_fourier_order = WEEKLY_FOURIER_ORDER,
                                        is_weekend = True, 
                                        country_name = COUNTRY_NAME)
                future = make_future(m, end, 168)
                future['weekday'] = future['ds'].apply(is_weekday)
                future['weekend'] = ~future['ds'].apply(is_weekday)
                forecast = m.predict(future)

            else:
                m = run_prophet_funct(df, params, 
                                        daily_fourier_order = DAILY_FOURIER_ORDER, 
                                        weekly_fourier_order = WEEKLY_FOURIER_ORDER,
                                        is_weekend = False, 
                                        country_name = COUNTRY_NAME)
                future = make_future(m, end, 168)
                forecast = m.predict(future)
    
    if params['seasonality_mode'] == 'additive':
        additive_condition = add_cond(forecast, percent, lower, upper, end)
        #check conditions and fix
        if additive_condition:
            if len(m.changepoints[np.abs(np.nanmean(m.params['delta'], axis=0)) >= 0.01].values)==0:
                last_changepoint = start
            else:
                last_changepoint = m.changepoints[np.abs(np.nanmean(m.params['delta'], axis=0)) >= 0.01].values[-1]
            minimum = forecast.set_index('ds')[end:]['additive_terms'].quantile(0.05)
            maximum = forecast.set_index('ds')[end:]['additive_terms'].quantile(0.95)
            last_point =((forecast.set_index('ds')[last_changepoint:]['trend']+minimum>lower)
                    & (forecast.set_index('ds')[last_changepoint:]['trend']+maximum<upper))[::-1].idxmax()

            forecast.loc[forecast['ds']>last_point, 'trend'] = forecast.loc[forecast['ds']==last_point, 'trend'].values[0]
            forecast['yhat'] = (forecast['trend']*(1+forecast['multiplicative_terms'])+forecast['additive_terms'])
    
    forecast['yhat'] = forecast['yhat'].clip(lower = lower, upper = upper)
    
    
    scaler =  StandardScaler(with_mean = False) # RobustScaler
    scaler.fit(df['y'].values.reshape(-1,1))

    df = df.set_index("ds")
    forecast = forecast.set_index("ds")

    
    results = pd.DataFrame( index = forecast.index,
                            columns = ["kpi_name", "dimension_name", "ground_truth",
                                        "pred", "error", "gt_wo_trend", "pred_wo_trend"])
    
    forecast["kpi_name"] = [kpi] * len(forecast)
    forecast["dimension_name"] = [dim_dict] * len(forecast)
    forecast["ground_truth"] = df.y

    forecast.rename(columns = {"yhat":"pred"}, inplace = True)
    forecast["error"] = scaler.transform(forecast.pred.values.reshape(-1,1))
    forecast["ground_truth_noise"] = df.y - forecast.trend - forecast.holidays - forecast.daily
    forecast["pred_noise"] = forecast.pred - forecast.trend - forecast.holidays - forecast.daily

    forecast.reset_index(inplace = True)
    #forecast["pred_wo_trend"] = 6

    #kpi név, dim , ts, trend, szezon, zaj, holiday, eredeti y
    return forecast

    #pd.Series(scaler.transform((df.set_index('ds')['y']-forecast.set_index('ds')['yhat']).values.reshape(-1,1)).T[0], index=forecast['ds'], name = dims_values_str))
    #scaler.transform((df['y']-forecast['yhat']).values.reshape(-1,1)).T[0]


# Tracking training status

In [None]:
metadata_store["failed"] = [False] * len(metadata_store)
metadata_store["num_fails"] = [0] * len(metadata_store)
metadata_store["last_failed_ts"] = [None] * len(metadata_store)
metadata_store["ts_uuid"] = metadata_store["failed"].map(lambda x : str(uuid4()))

# Compute

In [None]:
# Preprocessing dataframes, storing them by timeseries id

df_dict = {}
for  _, mdrow in metadata_store.iterrows():
    df = preprocess_data( datas, mdrow, start, end )
    df_dict[ mdrow.ts_uuid ] = df

In [None]:
taskgroups = {}

for  _, mdrow in metadata_store.iterrows():
    df = df_dict[ mdrow.ts_uuid ]

    taskgroup = dask.delayed(submit_training)( df, mdrow, p_uuid_df )
    taskgroups[ mdrow.ts_uuid ] = ( taskgroup )

compute = dask.compute( taskgroups )[0]
time.sleep(len(metadata_store)*0.5) # compute finishes before all submitted task, wait a little
#! this sleep time should depend on compute performance, in the case of a 72 core Intel Xeon E5-2697 a timeseries took 6 s on average

# Recomputing unfinished tasks

Sometimes, a few of the tasks is timed out. We need to check all whether they are finished, if not recompute them.

In [None]:
task_status = lambda : pd.Series([ [ fut.status for p_uuid, fut in compute[ts_uuid].items()] for ts_uuid in compute.keys() ]).explode().value_counts()
failed_tasks = lambda : [[ (ts_uuid,p_uuid) for p_uuid, fut in compute[ts_uuid].items() if fut.status != "finished"] for ts_uuid in compute.keys() ]

In [None]:
task_status()

In [None]:
def tg_finished(ts_uuid, compute):
    """Checking if all tasks are finished"""
    return all([ fut.status == "finished" for p_uuid, fut in compute[ts_uuid].items()])

In [None]:
zookeeper_table = pd.DataFrame(columns = ["ts_uuid", "p_uuid", "ts", "kpi", "dim", "params", "failed", "num_failed", "last_failed_ts"]) 
# here ts in ts_uuid stands for timeseries, while ts is timestamp. 🦆
# failed contains the status of the last retry
zookeeper_table

In [None]:
def refresh_zk_table( zookeeper_table, compute ):
    status_df = pd.DataFrame(compute).applymap(lambda x: x.status)
    unfinished_tasks = [(status_df.index[x], status_df.columns[y]) for x, y in zip(*np.where(status_df.values != 'finished'))] 

    for _, row in zookeeper_table.iterrows():
        ts_uuid = row.ts_uuid
        p_uuid  = row.p_uuid
        
        if (p_uuid, ts_uuid) not in unfinished_tasks:
            mask = ((ts_uuid == zookeeper_table.ts_uuid) & (p_uuid == zookeeper_table.p_uuid))

            zk_row = row.copy()
            zk_row["failed"] = False
            zookeeper_table[ mask ] = zk_row

    for p_uuid, ts_uuid in unfinished_tasks:

        if (ts_uuid in zookeeper_table.ts_uuid.values) and (p_uuid in zookeeper_table.p_uuid.values):

            mask = ((ts_uuid == zookeeper_table.ts_uuid) & (p_uuid == zookeeper_table.p_uuid))
            zk_row = zookeeper_table[ mask ].copy()
            zk_row.num_failed += 1
            zk_row.last_failed_ts = int(time.time())

            zk_row.bad_status = compute[ts_uuid][p_uuid].status
            zookeeper_table[ mask ] = zk_row

        else:
            mdrow = metadata_store[ metadata_store.ts_uuid == ts_uuid ].copy()

            task_to_retry = { "ts_uuid": ts_uuid, "p_uuid": p_uuid, "ts": mdrow.ts.values[0], 
                                "kpi": mdrow.kpi_name.values[0], "dim": mdrow.dimension_name.values[0],
                                "params": p_uuid_df.loc[p_uuid].to_dict(), "failed": True,
                                "num_failed": 1, "last_failed_ts": int(time.time()),
                                "bad_status": compute[ts_uuid][p_uuid].status }
            zookeeper_table = zookeeper_table.append(pd.Series(task_to_retry), ignore_index=True)
            
    return zookeeper_table

In [None]:
def retry_unfinished_tasks( compute ): 
            
    status_df = pd.DataFrame(compute).applymap(lambda x: x.status)
    unfinished_tasks = [(status_df.index[x], status_df.columns[y]) for x, y in zip(*np.where(status_df.values != 'finished'))]  # tuple of (p_uuid, ts_uuid)

    re_taskgroup = {}

    for p_uuid, ts_uuid in unfinished_tasks:
        df = df_dict[ ts_uuid ]
        row = metadata_store[ metadata_store.ts_uuid == ts_uuid ]
        params_row = p_uuid_df.loc[ p_uuid ]

        if ts_uuid not in re_taskgroup:
            re_taskgroup[ts_uuid] = {}
        re_taskgroup[ts_uuid][p_uuid] = dask.delayed(resubmit_training)( df, row, params_row )

    re_compute = dask.compute(re_taskgroup)[0] # recompute previously unfinished tasks

    # Putting back new tasks - they must be mostly finished, there might be still unfinished though, then we need to 
    for ts_uuid, tg in re_compute.items():
        
        for p_uuid, task in tg.items():
            compute[ts_uuid][p_uuid] = re_compute[ts_uuid][p_uuid]
        
    return compute

In [None]:
bad_status = ["pending", "cancelled", "lost", "error", "newly-created", "closing", "connecting", "running", "closed" ]

for i in range(10):
    
    print(i)
    print(task_status())

    zookeeper_table = refresh_zk_table(zookeeper_table, compute)


    #check if there is an unfinished task
    if not any(status in task_status() for status in bad_status):
        break

    status_df = pd.DataFrame(compute).applymap(lambda x: x.status)
    unfinished_tasks = [(status_df.index[x], status_df.columns[y]) for x, y in zip(*np.where(status_df.values != 'finished'))]

    compute = retry_unfinished_tasks(compute)

    
    
    time.sleep(0.2 * len(unfinished_tasks))

# Gathering results from distributed memory

In [None]:
status_df = pd.DataFrame(compute).applymap(lambda x: x.status)
finished_tasks = [(status_df.index[x], status_df.columns[y]) for x, y in zip(*np.where(status_df.values == 'finished'))]  # tuple of (p_uuid, ts_uuid)

results = {}

for p_uuid, ts_uuid in finished_tasks: 
    
    if ts_uuid not in results:
        results[ts_uuid] = {}

    results[ts_uuid][p_uuid] = compute[ts_uuid][p_uuid].result()

In [None]:
for ts, taskgroup in results.items():
    for p_uuid, task in taskgroup.items():
        params = p_uuid_df.loc[ p_uuid ]

        
        results[ts][p_uuid] = pd.concat([task.T, params]).T

In [None]:
concated_results  = { ts_uuid : pd.concat([ v for k,v in results[ ts_uuid ].items() ]).reset_index(drop=True) for ts_uuid in results }

# Forecast

In [None]:
timeseries_forecast = {}
for _,mdrow in metadata_store.iterrows(): 
    df = df_dict[ mdrow.ts_uuid ]
    tuning_results = concated_results[ mdrow.ts_uuid ]

    past_ts = dask.delayed(myForecast)(df, tuning_results, mdrow, "mae")
    timeseries_forecast[mdrow.ts_uuid] = past_ts

timeseries_forecast = dask.compute(timeseries_forecast)[0]