In [0]:
%run /Users/rishi@bodo.ai/setup/setup_nb_px

In [0]:
%fs
ls /mnt/rjml2/temp/demand-forecasting/train.csv

path,name,size
dbfs:/mnt/rjml2/temp/demand-forecasting/train.csv,train.csv,17333449


In [0]:
%%px

import numpy as np
import pandas as pd
import time
import datetime
import bodo
import os
import warnings
import matplotlib.pyplot as plt
%matplotlib inline

from prophet import Prophet
import logging
 
# disable informational messages from fbprophet
logging.getLogger('py4j').setLevel(logging.ERROR)

# Turn off pystan warnings
warnings.simplefilter("ignore", DeprecationWarning)
warnings.simplefilter("ignore", FutureWarning)

# Turn off fbprophet stdout logger
logging.getLogger('prophet').setLevel(logging.ERROR)
logging.getLogger('pystan').setLevel(logging.ERROR)

In [0]:
%%px

@bodo.jit()
def load_data():
    dtype = {'store': np.dtype('int64'), 'item': np.dtype('int64'), 
             'sales': np.dtype('int64')}
    train = pd.read_csv('/dbfs/mnt/rjml2/temp/demand-forecasting/train.csv', dtype = dtype, parse_dates=['date'])
    # train = pd.read_csv('demand-forecasting/train.csv', parse_dates=['date'])
    #print(train.head(3))
    return train

train = load_data()
if bodo.get_rank() == 0:
    display(train.head(2))
    print(train.info())

In [0]:
%%px

@bodo.jit()
def yearly_trend(train):
    train['year'] = train['date'].dt.year
    Yearly_trend = train.groupby('year', as_index = False)['sales'].sum()
    Yearly_trend = Yearly_trend.sort_values(['year'], ascending = True)
    print(bodo.allgatherv(Yearly_trend))
    return bodo.allgatherv(Yearly_trend), train

Yearly_trend, train = yearly_trend(train)

In [0]:
%%px

#BE-1419
#@bodo.jit()
#def yearly_trend_plot(Yearly_trend):
if bodo.get_rank() == 0:
  g = Yearly_trend.plot(x = 'year', y = 'sales')
  #plt.show()
  display(g)
#yearly_trend_plot(Yearly_trend)

In [0]:
%%px

@bodo.jit()
def monthly_trend(train):
    train['year-month'] = train['date'].dt.strftime('%Y-%m')
    Monthly_trend = train.groupby(['year-month'], as_index = False)['sales'].sum()
    Monthly_trend = Monthly_trend.sort_values(['year-month'], ascending = (True))
    print(bodo.allgatherv(Monthly_trend))
    return bodo.allgatherv(Monthly_trend), train

Monthly_trend, train = monthly_trend(train)

In [0]:
%%px

#BE-1419
#@bodo.jit()
#def monthly_trend_plot(Monthly_trend):
if bodo.get_rank() == 0:
    g = Monthly_trend.plot(x = 'year-month', y = 'sales')
    display(g)
    #plt.show()
    
#monthly_trend_plot(Monthly_trend)

In [0]:
%%px

@bodo.jit()
def weekly_trend(train):
    train['week-day'] = train['date'].dt.dayofweek
    Weekly_trend = train.groupby(['year','week-day'], as_index = False)['sales'].sum()
    Weekly_trend = Weekly_trend.sort_values(['year','week-day'], ascending = (True, True))
    print(bodo.allgatherv(Weekly_trend))
    return Weekly_trend, train

Weekly_trend = weekly_trend(train)

In [0]:
%%px

@bodo.jit()
def single_store(train):
    history_pd = train.loc[(train['store']== 1) & (train['item']== 1)]
    history_pd = history_pd['date', 'sales'].rename(columns = {'date' : 'ds', 'sales': 'y'})
    history_pd = history_pd.dropna()
    history_pd = history_pd.sort_values('ds', ascending = True)
    print(history_pd.head())
    return history_pd
history_pd = single_store(train)

In [0]:
%%px

sample_df = pd.DataFrame({"ds": ["2020-01-05", "2020-01-06"], "yhat": ["5623", "6354"]})

from numba.core import types
types.mydataframe = bodo.typeof(sample_df)

In [0]:
%%px


@bodo.jit()
def train_prophet(history_pd):
    
    with bodo.objmode(forecast_pd = "mydataframe"):
        model = Prophet(interval_width=0.95,
                        growth='linear',
                        daily_seasonality=False,
                        weekly_seasonality=True,
                        yearly_seasonality=True,
                        seasonality_mode='multiplicative')
        model.fit(history_pd)
        future_pd = model.make_future_dataframe(periods=90, 
                                                freq='d', 
                                                include_history=True)
        forecast_pd = model.predict(future_pd)
        # types.mydataframe = bodo.typeof(sample_df)
        # print(forecast_pd.head())
        forecast_pd = forecast_pd[['ds', 'yhat']]
        
    return forecast_pd
        
forecast_pd = train_prophet(history_pd)

In [0]:
%%px
forecast_pd.head(2)

In [0]:
%%px

#file an issue for mean_squared_error, mean_absolute_error (it is supported but it is not working!!!)
from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt
from datetime import date

@bodo.jit()
def evaluation_metrics(history_pd, forecast_pd):
    # get historical actuals & predictions for comparison
    forecast_pd['ds'] = pd.to_datetime(forecast_pd['ds']).dt.date
    history_pd['ds'] = pd.to_datetime(history_pd['ds']).dt.date
    actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
    predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']
    # calculate evaluation metrics
    with bodo.objmode():
        mae = mean_absolute_error(actuals_pd, predicted_pd)
        mse = mean_squared_error(actuals_pd, predicted_pd)
        rmse = sqrt(mse)
        # print metrics to the screen
        print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
    

evaluation_metrics(history_pd, forecast_pd)

In [0]:
%%px

@bodo.jit
def agg_store_item(ip_df):

    grp_df = (ip_df
              .groupby(['store', 'item', 'date'], as_index=False)
              .agg({'sales': sum})
                )
    grp_df = grp_df.rename(columns = {'date' : 'ds', 'sales': 'y'})
    return grp_df

grp_df = agg_store_item(train)

if bodo.get_rank() == 0:
    display(grp_df.head(2))

In [0]:
%%px

time = pd.date_range('6/28/2013', periods=1, freq='1D')


sample_df = pd.DataFrame({"ds": [time[0]],
                          "y": [5623],
                          "yhat": [5623],
                          "yhat_upper": [5623],
                          "yhat_lower": [5623],
                          "store": [12],
                          "item": [12]}
                          )

from numba.core import types
types.mydataframe = bodo.typeof(sample_df)

In [0]:
%%px

@bodo.jit()
def train_prophet(history_pd):
    
    history_pd = history_pd.dropna()
    with bodo.objmode(results_pd = "mydataframe"):
        
        model = Prophet(interval_width=0.95,
                        growth='linear',
                        daily_seasonality=False,
                        weekly_seasonality=True,
                        yearly_seasonality=True,
                        seasonality_mode='multiplicative')

        # class suppress_stdout_stderr(object):

        #     """
        #     A context manager for doing a "deep suppression" of stdout and stderr in
        #     Python, i.e. will suppress all print, even if the print originates in a
        #     compiled C/Fortran sub-function.
        #     This will not suppress raised exceptions, since exceptions are printed
        #     to stderr just before a script exits, and after the context manager has
        #     exited (at least, I think that is why it lets exceptions through).
        #     """
            
        #     def __init__(self):
        #         # Open a pair of null files
        #         self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)]
        #         # Save the actual stdout (1) and stderr (2) file descriptors.
        #         self.save_fds = [os.dup(1), os.dup(2)]

        #     def __enter__(self):
        #         # Assign the null pointers to stdout and stderr.
        #         os.dup2(self.null_fds[0], 1)
        #         os.dup2(self.null_fds[1], 2)

        #     def __exit__(self, *_):
        #         # Re-assign the real stdout/stderr back to (1) and (2)
        #         os.dup2(self.save_fds[0], 1)
        #         os.dup2(self.save_fds[1], 2)
        #         # Close the null files
        #         for fd in self.null_fds + self.save_fds:
        #             os.close(fd)
        
        # with suppress_stdout_stderr():
        #     model.fit(history_pd)

        model.fit(history_pd)

        future_pd = model.make_future_dataframe(periods=90, 
                                                freq='d', 
                                                include_history=True)
        forecast_pd = model.predict(future_pd)

        forecast_pd = forecast_pd[['ds', 'yhat', 'yhat_upper', 'yhat_lower']]

        # ASSEMBLE EXPECTED RESULT SET
        # --------------------------------------
        # get relevant fields from forecast
        f_pd = forecast_pd[['ds','yhat', 'yhat_upper', 'yhat_lower']].set_index('ds')
        
        # get relevant fields from history
        h_pd = history_pd[['ds','store','item','y']].set_index('ds')
        
        # join history and forecast
        results_pd = f_pd.join(h_pd, how='left')
        results_pd.reset_index(level=0, inplace=True)
        
        # get store & item from incoming data set
        results_pd['store'] = history_pd['store'].iloc[0]
        results_pd['item'] = history_pd['item'].iloc[0]

        
    return results_pd

In [0]:
%%px
import time

#@bodo.jit
def proph_fn(df):
    
    ip_df = df[(df['store'] == 1) & (df['item'] == 1)]
    #ip_df = df
    start_time = time.time()
    g = ip_df.groupby(['store', 'item'], as_index=False)
    fcst_df = g.apply(train_prophet)
    fcst_df.columns = ['ds', 'yhat', 'yhat_upper', 'yhat_lower', 'store', 'item','y']
    print("Execution time: ", time.time()-start_time, "seconds")
    
    fcst_df['training_date'] = pd.to_datetime('today')
    print(fcst_df.head(10))

    return fcst_df

In [0]:
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt
from datetime import date
from prophet import Prophet
import logging

from pyspark.sql.types import *

# structure of the training data set
train_schema = StructType([
  StructField('date', DateType()),
  StructField('store', IntegerType()),
  StructField('item', IntegerType()),
  StructField('sales', IntegerType())
  ])

# read the training file into a dataframe
train = spark.read.csv(
  '/mnt/rjml2/temp/demand-forecasting/train.csv', 
  header=True, 
  schema=train_schema
  )

# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')

sql_statement = '''
  SELECT
    store,
    item,
    CAST(date as date) as ds,
    SUM(sales) as y
  FROM train
  WHERE store = 1 AND item = 1
  GROUP BY store, item, ds
  ORDER BY store, item, ds
  '''

store_item_history = (
  spark
    .sql( sql_statement )
    .repartition(sc.defaultParallelism, ['store', 'item'])
  ).cache()

from pyspark.sql.types import *

result_schema =StructType([
  StructField('ds',DateType()),
  StructField('store',IntegerType()),
  StructField('item',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

def forecast_store_item( history_pd):
  
  # TRAIN MODEL AS BEFORE
  # --------------------------------------
  # remove missing values (more likely at day-store-item level)
  history_pd = history_pd.dropna()
  
  # configure the model
  model = Prophet(
    interval_width=0.95,
    growth='linear',
    daily_seasonality=False,
    weekly_seasonality=True,
    yearly_seasonality=True,
    seasonality_mode='multiplicative'
    )
  
  # train the model
  model.fit( history_pd )
  # --------------------------------------
  
  # BUILD FORECAST AS BEFORE
  # --------------------------------------
  # make predictions
  future_pd = model.make_future_dataframe(
    periods=90, 
    freq='d', 
    include_history=True
    )
  forecast_pd = model.predict( future_pd )  
  # --------------------------------------
  
  # ASSEMBLE EXPECTED RESULT SET
  # --------------------------------------
  # get relevant fields from forecast
  f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
  
  # get relevant fields from history
  h_pd = history_pd[['ds','store','item','y']].set_index('ds')
  
  # join history and forecast
  results_pd = f_pd.join( h_pd, how='left' )
  results_pd.reset_index(level=0, inplace=True)
  
  # get store & item from incoming data set
  results_pd['store'] = history_pd['store'].iloc[0]
  results_pd['item'] = history_pd['item'].iloc[0]
  # --------------------------------------
  
  # return expected dataset
  return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  

from pyspark.sql.functions import current_date

results = (
  store_item_history
    .groupBy('store', 'item')
      .applyInPandas(forecast_store_item, schema=result_schema)
    .withColumn('training_date', current_date() )
    )

results.createOrReplaceTempView('new_forecasts')

display(results)

ds,store,item,y,yhat,yhat_upper,yhat_lower,training_date
2013-01-01,1,1,13.0,10.051273,18.253439,2.0496435,2021-10-14
2013-01-02,1,1,11.0,10.5286255,19.16495,2.4681573,2021-10-14
2013-01-03,1,1,14.0,11.053265,18.91158,2.4653366,2021-10-14
2013-01-04,1,1,13.0,12.244392,20.129028,3.7633965,2021-10-14
2013-01-05,1,1,10.0,13.780334,22.537075,5.3032236,2021-10-14
2013-01-06,1,1,12.0,14.37895,22.995806,5.5721154,2021-10-14
2013-01-07,1,1,10.0,7.8728924,17.192232,-1.0297664,2021-10-14
2013-01-08,1,1,9.0,9.901317,18.864975,1.6944209,2021-10-14
2013-01-09,1,1,12.0,10.315249,18.913128,2.0872517,2021-10-14
2013-01-10,1,1,9.0,10.787018,19.014244,2.263068,2021-10-14


In [0]:
%%px

new_forecasts = proph_fn(grp_df)

In [0]:
%%px

print(new_forecasts.head())

In [0]:
%%px

#@bodo.jit()
def save_data(new_forecasts):
    forecast_tosave = new_forecasts.rename(columns = {"ds" : "date", "y" : "sales", "yhat" : "sales_predicted",
                                                    "yhat_upper" : "sales_predicted_upper",
                                                    "yhat_lower" : "sales_predicted_lower"})

    forecast_tosave.to_parquet("/dbfs/mnt/temprj2/data/demand-forecasting/forecasts.pq")


save_data(new_forecasts)

In [0]:
%%px
# define function to calculate metrics

@bodo.jit()
def evaluate_forecast(evaluation_pd):
  
  # calulate evaluation metrics
    mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
    mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
    rmse = sqrt( mse )
    
    # get store & item in incoming data set
    training_date = evaluation_pd['training_date'].iloc[0]
    
    store = evaluation_pd['store'].iloc[0]
    item = evaluation_pd['item'].iloc[0]
  
  # assemble result set
    results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
    results_df = pd.DataFrame( results )
    #print(results_df)
    return results_df

In [0]:
%%px

#@bodo.jit()
def clean_data(df):
    df = df.dropna()
    return df

new_forecasts = clean_data(new_forecasts)

In [0]:
%%px

#@bodo.jit()
def calculate_metrics(df):
    df = df.loc[df['ds'] < '2018-01-01']
    df = df[['training_date', 'store', 'item', 'y', 'yhat']]
    new_forecast_evals = df.groupby(['training_date', 'store', 'item'], as_index = False).apply(evaluate_forecast)
    print(new_forecast_evals)
    
calculate_metrics(new_forecasts)