# 5. Implementation of Scalable Demand Forecasting with PySpark in Google Colab
Similar to setting up Prophet, PySpark installation can be very difficult at times. However, those tasks are extremely easy Google Colaboratory. 

First, go to <a href = "https://research.google.com/colaboratory">Google Colab</a> and click "File" -> "New notebook" to create a new notebook.

### 5.1. Preparation
#### 5.1.1. Mount to Google Drive
For easy access to files, connect the notebook to your Google Drive.

In [1]:
# Import library
from google.colab import drive

# Connect to your google drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#### 5.1.2. Install PySpark and Prophet
Installing PySpark and Prophet only require one line of code for each.

In [2]:
# Install PySpark
!pip install pyspark



In [3]:
# Install Prophet                                                                                                                                                                                                  
!pip install Prophet



#### 5.1.3. Load necessary packages

In [4]:
# Import library
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from prophet import Prophet


#### 5.1.4. Upload the CSV file to Google Drive
- Click the folder icon in the left menu as shown in the image below.
- Although you can save anywhere you wish, I like to save it in the Google Drive Colab Notebook folder. To do so, go to "content" -> "dive" -> "MyDrive" -> Colab Notebooks -> create "data" folder
- Click the three dots next to "data". You can upload the CSV file we saved by clicking "Upload"

<img src ="https://github.com/youngdataspace/Time-Series-Forecasting-in-Spark/blob/main/Google%20Colab1.JPG?raw=true">
<img src = "https://github.com/youngdataspace/Time-Series-Forecasting-in-Spark/blob/main/Google%20Colab2.JPG?raw=true">
<img src = "https://github.com/youngdataspace/Time-Series-Forecasting-in-Spark/blob/main/Google%20Colab3.JPG?raw=true">

#### 5.1.5. Import the CSV file and explore it
Import the CSV file we just uploaded to Google Drive.


In [5]:
# Import the csv file and explore it
sales_pd = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/data/store_sales.csv')

# Convert ds to datetime
sales_pd['ds'] = pd.to_datetime(sales_pd['ds'])

# Display info
sales_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 913000 entries, 0 to 912999
Data columns (total 4 columns):
 #   Column  Non-Null Count   Dtype         
---  ------  --------------   -----         
 0   ds      913000 non-null  datetime64[ns]
 1   store   913000 non-null  int64         
 2   item    913000 non-null  int64         
 3   y       913000 non-null  float64       
dtypes: datetime64[ns](1), float64(1), int64(2)
memory usage: 27.9 MB


In [6]:
# Descriptive statistics
sales_pd.describe()

Unnamed: 0,store,item,y
count,913000.0,913000.0,913000.0
mean,5.5,25.5,52.250287
std,2.872283,14.430878,28.801144
min,1.0,1.0,0.0
25%,3.0,13.0,30.0
50%,5.5,25.5,47.0
75%,8.0,38.0,70.0
max,10.0,50.0,231.0


Looks like we correctly have 1-10 stores and 1-50 items.

### 5.2. Prophet x PySpark
#### 5.2.1. Create a Spark session
Spark Sessions utilize Spark's functions. They are created in the Driver program, which is inside the Master node. 

Spark uses Master-Slave architecture. Salve nodes execute the tasks assigned by the Master node.

In [7]:
# Create a Spark Session - Run it on a standalone mode since it is just a practice
# master(): Either yarn or mesos; local[X] when running in standalone
# appName(): Name of the application
# getOrCreate: returns existing SparkSession; otherwise, create a new one
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

#### 5.2.2. Structure schema
After reading the CSV file with PySpark we will structure the output of the data. See <a href = "https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html">here</a> for different types of struct fields.


In [8]:
# Read the csv file
sales_df = spark.createDataFrame(sales_pd)

# Display the schema
sales_df.printSchema()

root
 |-- ds: timestamp (nullable = true)
 |-- store: long (nullable = true)
 |-- item: long (nullable = true)
 |-- y: double (nullable = true)



In [9]:
# Define a schema
schema = StructType([
                     StructField('store', IntegerType()),
                     StructField('item', IntegerType()),
                     StructField('ds', TimestampType()),
                     StructField('y', FloatType()),
                     StructField('yhat', DoubleType()),
                     StructField('yhat_upper', DoubleType()),
                     StructField('yhat_lower', DoubleType()),
                     ])  

#### 5.2.3. Utilize Pandas UDF and PySpark to train multiple models in parallel
The next step is to set parameters, fit the model, and predict sales just as we did for 1 forecast model. We are going to build a function and apply that function to all store-item groups. The only difference between this and our previous 1-model forecast is that we are going to utilize Pandas UDF and PySpark to parallelize the process.

In [10]:
# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(store_pd):
  
  # instantiate the model and set parameters
  model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
  )
  
  # fit the model to historical data
  model.fit(store_pd)
  
  # Create a data frame that lists 90 dates starting from Jan 1 2018
  future = model.make_future_dataframe(
      periods=90,
      freq='d',
      include_history=True)
  
  # Out of sample prediction
  future = model.predict(future)

  # Create a data frame that contains store, item, y, and yhat
  f_pd = future[['ds', 'yhat', 'yhat_upper', 'yhat_lower']]
  st_pd = store_pd[['ds', 'store', 'item', 'y']]
  result_pd = f_pd.join(st_pd.set_index('ds'), on='ds', how='left')
  
  # fill store and item
  result_pd['store'] = store_pd['store'].iloc[0]
  result_pd['item'] = store_pd['item'].iloc[0]
  #result_pd['store'] = store_pd['store'].fillna(method='ffill')
  #result_pd['item'] = store_pd['item'].fillna(method='ffill')
  return result_pd[['store', 'item', 'ds', 'y', 'yhat',
                    'yhat_upper', 'yhat_lower']]

In [11]:
# Apply the function to all store-items
results = sales_df.groupby(['store', 'item']).apply(apply_model)

# Print the results - calculate the time to run
import timeit
start = timeit.default_timer()
results.show()
stop = timeit.default_timer()



+-----+----+-------------------+----+------------------+------------------+--------------------+
|store|item|                 ds|   y|              yhat|        yhat_upper|          yhat_lower|
+-----+----+-------------------+----+------------------+------------------+--------------------+
|    1|   1|2013-01-01 00:00:00|13.0|10.051272869689301| 19.81331333729659|  1.1653252427277994|
|    1|   1|2013-01-02 00:00:00|11.0|10.528625323821489| 19.52697394359962|  1.1040720751204967|
|    1|   1|2013-01-03 00:00:00|14.0|11.053264561305632|19.121703358223876|   2.340893727254274|
|    1|   1|2013-01-04 00:00:00|13.0|12.244392640789227|20.759074056045254|  3.1621289404368893|
|    1|   1|2013-01-05 00:00:00|10.0| 13.78033453999933|22.222193062919708|  4.8703122255888385|
|    1|   1|2013-01-06 00:00:00|12.0|14.378950515739104| 22.87331589821483|   5.523753221406483|
|    1|   1|2013-01-07 00:00:00|10.0| 7.872892467507367|16.531746667114465|-0.49527759234844837|
|    1|   1|2013-01-08 00:00:0

In [12]:
# Print the time it took to forecast 500 models
print('Time: ', stop - start)   

Time:  37.37365171300007


It only took 29 seconds to train 500 models and forecast 3 months out!

In [15]:
type(results) 

pyspark.sql.dataframe.DataFrame

# 6. Conclusion
In this long post, we went through several topics. We started with identifying trends and seasonality, moved on to building a Prophet model, and scaled the process to model 500 distinct models with PySpark. We didn't get to cover CNN, LSTM, and Seasonal ARIMA but I am planning on adding them in a few days.