# Environment

In [1]:
!python -V

Python 3.10.8


In [2]:
#!pip install plotly
#!pip install Prophet

In [3]:
import os
import re
import warnings
from datetime import datetime, timedelta

import plotly.express as px
import numpy as np

import pyspark
import pyspark.pandas as ps

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from prophet import Prophet



In [4]:
ps.options.display.max_rows = 10

ps.set_option('plotting.backend', 'plotly')

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "lab"

warnings.filterwarnings("ignore") 

In [5]:
print(f'pyspark version: {pyspark.__version__}')

pyspark version: 3.3.1


# Spark Secion

In [6]:
conf = pyspark.SparkConf()

conf.setAppName('Task1')
conf.setMaster('local[2]')

sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)

# Load Data

In [7]:
wd = os.getcwd()
path_data = os.path.join(wd, 'study_case/Task1/data/', 'forcasting_cs_data.csv')
#path_data = os.path.join(wd, 'data', 'forcasting_cs_data.csv')
df = ps.read_csv(path_data)
df.head()

Unnamed: 0,Product,date,Sales,Price Discount (%),In-Store Promo,Catalogue Promo,Store End Promo,Google_Mobility,Covid_Flag,V_DAY,EASTER,CHRISTMAS
0,SKU1,05/02/17,27750,0%,0,0,0,0.0,0,0,0,0
1,SKU1,12/02/17,29023,0%,1,0,1,0.0,0,1,0,0
2,SKU1,19/02/17,45630,17%,0,0,0,0.0,0,0,0,0
3,SKU1,26/02/17,26789,0%,1,0,1,0.0,0,0,0,0
4,SKU1,05/03/17,41999,17%,0,0,0,0.0,0,0,0,0


## Column name homogenization

In [8]:
columns = [re.sub("[^A-Z0-9 ]", "", column, 0, re.IGNORECASE) for column in df.columns]
columns = [column.lower().replace(' ','') for column in columns]
# columns = ['product',
#             'date',
#             'sales',
#             'price_discount',
#             'instore_promo',
#             'catalogue_promo',
#             'store_end_promo',
#             'google_mobility',
#             'covid_flag',
#             'v_day',
#             'easter',
#             'christmas']
df.columns = columns

## Type convertion

In [9]:
df['date'] = ps.to_datetime(df['date'], infer_datetime_format=True)
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['cw'] = df['date'].dt.week
df['quarter'] = df['date'].dt.quarter

In [10]:
df['sku_number'] = df['product'].apply(lambda x: int(x.replace('SKU','')))

In [11]:
df = df.drop(['product'], axis= 1)
df['pricediscount'] = df['pricediscount'].apply(lambda x: float(x.replace('%','')))

In [13]:
df.shape

(1218, 16)

In [14]:
from pyspark.sql.functions import pandas_udf, PandasUDFType, sum, max, col, concat, lit
from pyspark.sql.types import *
import pandas as pd

In [18]:
schema = StructType([
        StructField('sku_number', IntegerType()),
        StructField('ds', TimestampType()),
        StructField('yhat', DoubleType()),
])

In [19]:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(df):

    def train_fitted_prophet(df, cutoff):

        ts_train = (df
                    .query('date <= @cutoff')
                    .rename(columns={'date': 'ds', 'sales': 'y'})
                    .sort_values('ds')
                    
                    )

        ts_test = (df
                   .query('date > @cutoff')
                   .rename(columns={'date': 'ds', 'sales': 'y'})
                   .sort_values('ds')
                   .assign(ds=lambda x: pd.to_datetime(x["ds"]))
                   .drop('y', axis=1)
                   )
                       
        # init model
        m = Prophet(
                    interval_width=0.95,
                    growth='linear',
                    daily_seasonality=False,
                    weekly_seasonality=True,
                    yearly_seasonality=True,
                    seasonality_mode='multiplicative'
        )

        m.fit(ts_train)

        df["date"] = pd.to_datetime(df["date"])

        ts_hat = (m.predict(ts_test)[["ds", "yhat"]]
                  .assign(ds=lambda x: pd.to_datetime(x["ds"]))
                  ).merge(ts_test, on=["ds"], how="left") 
        
    
        return pd.DataFrame(ts_hat, columns=schema.fieldNames())

    return train_fitted_prophet(df, cutoff)

In [21]:
days_to_subtract = 60
cutoff = df['date'].max() - timedelta(days=days_to_subtract)

global_predictions = (df.to_spark()
                        .groupBy("sku_number")
                        .apply(apply_model)
                        )
                        
global_predictions.show()

+----------+-------------------+------------------+
|sku_number|                 ds|              yhat|
+----------+-------------------+------------------+
|         1|2020-11-10 00:00:00|  67117.2669579449|
|         1|2020-11-15 00:00:00| 64084.62953223526|
|         1|2020-11-22 00:00:00| 49089.69413483643|
|         1|2020-11-29 00:00:00| 39004.15243287487|
|         1|2020-12-01 00:00:00| 41404.33223606509|
|         1|2020-12-04 00:00:00| 39212.15997928289|
|         1|2020-12-07 00:00:00|62789.974895373096|
|         1|2020-12-13 00:00:00|  97524.3986542807|
|         1|2020-12-20 00:00:00|134349.01881582264|
|         1|2020-12-27 00:00:00|135007.10244620257|
|         2|2020-11-10 00:00:00|  7903.30488684252|
|         2|2020-11-15 00:00:00| 7788.260524640101|
|         2|2020-11-22 00:00:00| 8640.636494637607|
|         2|2020-11-29 00:00:00| 9710.717418670001|
|         2|2020-12-01 00:00:00| 9614.602956120474|
|         2|2020-12-04 00:00:00|  8792.18266724299|
|         2|