In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta, date
import pandas as pd
from fbprophet import Prophet

In [0]:
CURRENT_D            = current_date()
CURRENT_DT           = current_timestamp()


In [0]:
prediction_start_date =   spark.createDataFrame([['1']],['id']).withColumn('date',to_timestamp(CURRENT_D))\
                                      .select('date').rdd.flatMap(lambda x: x).collect()[0]

JIT_end_date = prediction_start_date+timedelta(days=365)


In [0]:
df = spark.read.table('default.matnr_cons_df') 

In [0]:
forecast_schema =StructType([
  StructField('ds',TimestampType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('MATNR',StringType()),
  StructField('AREA',StringType()),
  StructField('LGORT',StringType()),
  StructField('Is_Predicted',StringType())
  ])

In [0]:
@pandas_udf(forecast_schema, PandasUDFType.GROUPED_MAP)
def fb_model(training_data):
  min_available_date = training_data['ds'].min()
  max_available_date = training_data['ds'].max()
  training_data['weekday'] = training_data['ds'].dt.dayofweek
  final_df = training_data[~training_data['weekday'].isin([5,6])].drop('weekday', axis=1).sort_values(by=['ds'])
  hist_df = training_data.drop('weekday', axis=1).sort_values(by=['ds'])
  daily_model = Prophet(daily_seasonality=False,weekly_seasonality=True, yearly_seasonality=True)
  
  daily_model.fit(final_df)
  
  future_df = final_df[['ds']].append(pd.DataFrame(pd.to_datetime(pd.bdate_range(start=max_available_date, end=JIT_end_date, freq='B',closed='right').values.astype('datetime64[D]')),columns=['ds']),ignore_index=True)
  forecast_df = daily_model.predict( future_df )
  forecast_df = forecast_df[ ['ds','yhat'] ].set_index('ds')
  f_idx = pd.date_range(min_available_date,JIT_end_date,freq='D')
  f_pd = forecast_df.reindex(f_idx,fill_value=0).reset_index().rename(columns={'index': 'ds'}).set_index('ds')
  h_pd = hist_df[['ds','y']].set_index('ds')
  
  forecast_pd = f_pd.join(h_pd, how='left')
  
  forecast_pd["yhat"] = forecast_pd["yhat"].apply(lambda x: x if x > 0 else 0)
  forecast_pd['MATNR'] = training_data['MATNR'].iloc[0]
  forecast_pd['AREA'] = training_data['AREA'].iloc[0]
  forecast_pd['LGORT'] = training_data['LGORT'].iloc[0]
  forecast_pd.reset_index(drop=False,inplace=True)
  forecast_pd["Is_Predicted"] = forecast_pd["ds"].apply(lambda x: "Y" if max_available_date < x else "N")
  
  return forecast_pd[ ['ds','MATNR','AREA','LGORT','y','yhat','Is_Predicted'] ]

In [0]:
fb_prophet_forecast = df.groupBy("MATNR","AREA","LGORT").apply(fb_model)\
                        .withColumn('yhat',round(col('yhat'))).fillna(0)\
                        .select('ds','MATNR','AREA','LGORT','y','yhat','Is_Predicted')
# display(fb_prophet_forecast.filter(col('MATNR')=='1166284'))

ds,MATNR,AREA,LGORT,y,yhat,Is_Predicted
2019-05-06T00:00:00.000+0000,1166284,NPI,1000,48.0,16.0,N
2019-05-07T00:00:00.000+0000,1166284,NPI,1000,4.0,16.0,N
2019-05-08T00:00:00.000+0000,1166284,NPI,1000,8.0,18.0,N
2019-05-09T00:00:00.000+0000,1166284,NPI,1000,2.0,18.0,N
2019-05-10T00:00:00.000+0000,1166284,NPI,1000,0.0,16.0,N
2019-05-11T00:00:00.000+0000,1166284,NPI,1000,0.0,0.0,N
2019-05-12T00:00:00.000+0000,1166284,NPI,1000,0.0,0.0,N
2019-05-13T00:00:00.000+0000,1166284,NPI,1000,3.0,15.0,N
2019-05-14T00:00:00.000+0000,1166284,NPI,1000,12.0,14.0,N
2019-05-15T00:00:00.000+0000,1166284,NPI,1000,14.0,16.0,N


In [0]:
fb_prophet_forecast.write.format("delta").mode("overwrite").saveAsTable('default.daily_forecast_df')