## Building a Forecasting Swarm

We are going to use a notebook to build the bees for a forecasting swarm. We are going to use the DevSwarm capabilities of the SDK to enable us to test this locally in our notebook as we go. We will first prep our data and then use ARIMA to make our forecast.  

In [1]:
import logging
mpl_logger = logging.getLogger('matplotlib')
mpl_logger.setLevel(logging.WARNING)

from bytewax.swarm import register_bee, DevSwarm
from pmdarima.arima import auto_arima
import pandas as pd
from sklearn.metrics import r2_score
import numpy as np
import json

In [2]:
@register_bee(name='data-prep')
def data_prep(swarm, data, context):
    data = pd.read_json(data)
    data.iloc[:,0] = pd.to_datetime(data.iloc[:,0])
    data.rename(columns={data.columns[0]:'ds'}, inplace=True)
    data.set_index('ds',inplace=True)
    data.rename(columns={data.columns[0]:'y'}, inplace=True)
    swarm.publish(data, context)

In [3]:
@register_bee(name='arima')
def arima(swarm, df, context):
    #split for train and test
    train = df[:int(len(df)*.80)]
    test = df[int(-len(df)*.20):].copy()

    # train model
    arima_model =  auto_arima(train,start_p=0, d=1, start_q=0, 
                            max_p=5, max_d=5, max_q=5, start_P=0, 
                            D=1, start_Q=0, max_P=5, max_D=5,
                            max_Q=5, m=12, seasonal=True, 
                            error_action='warn',trace = True,
                            supress_warnings=True,stepwise = True,
                            random_state=20,n_fits = 50 )
    print(arima_model.summary())
    in_sample_prediction = pd.DataFrame(arima_model.predict(n_periods = int(len(df)*.20)),index=test.index)
    in_sample_prediction.columns = ['y_hat']
    test['y_hat'] = in_sample_prediction

    # determine model score 
    score = r2_score(test['y'], test['y_hat'])
    if score >= 0.5:
        # make dataframe with future periods
        ftr =  pd.DataFrame(index = pd.date_range(df.index[-1], df.index[-1] + pd.DateOffset(months=24), freq='M') + pd.DateOffset(days=1), columns=['y'])
        ftr['y'] = pd.DataFrame(arima_model.predict(n_periods = int(len(ftr))),index=ftr.index)
        prediction = pd.concat([df, ftr])
        response = json.dumps({'prediction':prediction.to_json(date_format='iso'), 'r2score':score})
        swarm.respond(response, context)
    else:
        swarm.respond("score too low, can't predict", context)

In [4]:
if __name__ == '__main__':
    swarm = DevSwarm()
    data = pd.read_csv('data.csv')
    data = data.to_json()
    data_prep(swarm, data, {})
    df = swarm.published[0]

    #start the second bee after resetting the swarm object
    print(df)
    arima(swarm, df, {})
    
    resp = json.loads(swarm.responded[0])
    forecast = pd.read_json(resp['prediction'])
    print(resp)


               y
ds              
1964-01-01  2815
1964-02-01  2672
1964-03-01  2755
1964-04-01  2721
1964-05-01  2946
...          ...
1972-05-01  4618
1972-06-01  5312
1972-07-01  4298
1972-08-01  1413
1972-09-01  5877

[105 rows x 1 columns]
Performing stepwise search to minimize aic
 ARIMA(0,1,0)(0,1,0)[12]             : AIC=1183.693, Time=0.03 sec
 ARIMA(1,1,0)(1,1,0)[12]             : AIC=1173.736, Time=0.07 sec
 ARIMA(0,1,1)(0,1,1)[12]             : AIC=1157.042, Time=0.15 sec
 ARIMA(0,1,1)(0,1,0)[12]             : AIC=1155.109, Time=0.04 sec
 ARIMA(0,1,1)(1,1,0)[12]             : AIC=1157.009, Time=0.12 sec
 ARIMA(0,1,1)(1,1,1)[12]             : AIC=1158.348, Time=0.30 sec
 ARIMA(1,1,1)(0,1,0)[12]             : AIC=1155.379, Time=0.07 sec
 ARIMA(0,1,2)(0,1,0)[12]             : AIC=1155.138, Time=0.07 sec
 ARIMA(1,1,0)(0,1,0)[12]             : AIC=1173.612, Time=0.02 sec
 ARIMA(1,1,2)(0,1,0)[12]             : AIC=1155.790, Time=0.14 sec
 ARIMA(0,1,1)(0,1,0)[12] intercept   : AIC