In [1]:
!pip install kaggle prophet



In [2]:
!chmod 600 /content/kaggle.json
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/kaggle.json

In [3]:
!kaggle competitions download -c demand-forecasting-kernels-only

Downloading demand-forecasting-kernels-only.zip to /content
  0% 0.00/3.29M [00:00<?, ?B/s]
100% 3.29M/3.29M [00:00<00:00, 86.2MB/s]


In [4]:
!unzip demand-forecasting-kernels-only.zip

Archive:  demand-forecasting-kernels-only.zip
  inflating: sample_submission.csv   
  inflating: test.csv                
  inflating: train.csv               


In [16]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from prophet import Prophet

In [17]:
# Import the csv file and explore it
sales_pd = pd.read_csv('train.csv').rename(columns = {'sales' : 'y', 'date' : 'ds'})

# Convert ds to datetime
sales_pd['ds'] = pd.to_datetime(sales_pd['ds'])

In [18]:
sales_pd[['item', 'store']].nunique()

Unnamed: 0,0
item,50
store,10


In [19]:
# Create a larger data frame
sales_pd_10k = pd.DataFrame()
for i in range(0,10):
    temp_pd = sales_pd.copy()
    ip1 = i + 1
    temp_pd['store'] = temp_pd['store'] + (10 * i)
    sales_pd_10k = pd.concat([sales_pd_10k, temp_pd])
    print('added data frame', ip1)

added data frame 1
added data frame 2
added data frame 3
added data frame 4
added data frame 5
added data frame 6
added data frame 7
added data frame 8
added data frame 9
added data frame 10


In [20]:
sales_pd_10k[['item', 'store']].nunique()

Unnamed: 0,0
item,50
store,100


In [22]:
spark = SparkSession.builder.appName('time_series_pipeline').getOrCreate()

# Read the csv file
sales_df = spark.createDataFrame(sales_pd_10k)

# Display the schema
sales_df.printSchema()

root
 |-- ds: timestamp (nullable = true)
 |-- store: long (nullable = true)
 |-- item: long (nullable = true)
 |-- y: long (nullable = true)



In [23]:
# Partition the data
sales_df.createOrReplaceTempView("item_sales")
sql = "SELECT * FROM item_sales"
sales_part = (spark.sql(sql)\
   .repartition(spark.sparkContext.defaultParallelism,
   ['store', 'item'])).cache()
sales_part.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [ds#115, store#116L, item#117L, y#118L]
      +- InMemoryRelation [ds#115, store#116L, item#117L, y#118L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- AdaptiveSparkPlan isFinalPlan=false
               +- Exchange hashpartitioning(store#116L, item#117L, 2), REPARTITION_BY_NUM, [plan_id=38]
                  +- Scan ExistingRDD[ds#115,store#116L,item#117L,y#118L]




In [24]:
# Define a schema
schema = StructType([
                     StructField('store', IntegerType()),
                     StructField('item', IntegerType()),
                     StructField('ds', TimestampType()),
                     StructField('y', FloatType()),
                     StructField('yhat', DoubleType()),
                     StructField('yhat_upper', DoubleType()),
                     StructField('yhat_lower', DoubleType()),
                     ])

In [25]:
# define the Pandas UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(store_pd):

  # instantiate the model and set parameters
  model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
  )

  # fit the model to historical data
  model.fit(store_pd)

  # Create a data frame that lists 90 dates starting from Jan 1 2018
  future = model.make_future_dataframe(
      periods=90,
      freq='d',
      include_history=True)

  # Out of sample prediction
  future = model.predict(future)

  # Create a data frame that contains store, item, y, and yhat
  f_pd = future[['ds', 'yhat', 'yhat_upper', 'yhat_lower']]
  st_pd = store_pd[['ds', 'store', 'item', 'y']]
  result_pd = f_pd.join(st_pd.set_index('ds'), on='ds', how='left')

  # fill store and item
  result_pd['store'] = store_pd['store'].iloc[0]
  result_pd['item'] = store_pd['item'].iloc[0]
  #result_pd['store'] = store_pd['store'].fillna(method='ffill')
  #result_pd['item'] = store_pd['item'].fillna(method='ffill')
  return result_pd[['store', 'item', 'ds', 'y', 'yhat',
                    'yhat_upper', 'yhat_lower']]

In [26]:
# Apply the function to all store-items
results = sales_part.groupby(['store', 'item']).apply(apply_model)

# Print the results - calculate the time to run
import timeit
start = timeit.default_timer()
results.show()
stop = timeit.default_timer()



+-----+----+-------------------+----+-------------------+------------------+-------------------+
|store|item|                 ds|   y|               yhat|        yhat_upper|         yhat_lower|
+-----+----+-------------------+----+-------------------+------------------+-------------------+
|    1|   1|2013-01-01 00:00:00|13.0|-1.5951843346568653| 9.896378787292104|-11.820722689033868|
|    1|   1|2013-01-02 00:00:00|11.0|  3.928086711757573|14.454911526656364| -6.770645336481006|
|    1|   1|2013-01-03 00:00:00|14.0|0.17623537812386267|11.245061742012211|-10.695218586073745|
|    1|   1|2013-01-04 00:00:00|13.0| 2.0189878876434495|13.045530688669562| -8.905617669494742|
|    1|   1|2013-01-05 00:00:00|10.0| 7.1759896636448035| 18.09532212101434| -3.210769523043518|
|    1|   1|2013-01-06 00:00:00|12.0|  8.201119988558144| 19.49126087097075|  -3.16863296253029|
|    1|   1|2013-01-07 00:00:00|10.0| 2.2416033619636706| 12.63090337054053| -9.049887532707741|
|    1|   1|2013-01-08 00:00:0

In [27]:
# Print the time it took to forecast 500 models
print('Time: ', stop - start)

Time:  40.49791901000003


In [28]:
results.coalesce(1)
results.createOrReplaceTempView('forecasted')
spark.sql("SELECT * FROM forecasted WHERE ITEM==1 AND STORE==1").show()

+-----+----+-------------------+----+-------------------+------------------+-------------------+
|store|item|                 ds|   y|               yhat|        yhat_upper|         yhat_lower|
+-----+----+-------------------+----+-------------------+------------------+-------------------+
|    1|   1|2013-01-01 00:00:00|13.0|-1.5951843346568653| 9.896378787292104|-11.820722689033868|
|    1|   1|2013-01-02 00:00:00|11.0|  3.928086711757573|14.454911526656364| -6.770645336481006|
|    1|   1|2013-01-03 00:00:00|14.0|0.17623537812386267|11.245061742012211|-10.695218586073745|
|    1|   1|2013-01-04 00:00:00|13.0| 2.0189878876434495|13.045530688669562| -8.905617669494742|
|    1|   1|2013-01-05 00:00:00|10.0| 7.1759896636448035| 18.09532212101434| -3.210769523043518|
|    1|   1|2013-01-06 00:00:00|12.0|  8.201119988558144| 19.49126087097075|  -3.16863296253029|
|    1|   1|2013-01-07 00:00:00|10.0| 2.2416033619636706| 12.63090337054053| -9.049887532707741|
|    1|   1|2013-01-08 00:00:0