- pip install pystan==2.19.1.1
- pip install fbprophet

In [20]:
import pandas as pd
import numpy as np
import time
from tqdm import tqdm

from fbprophet import Prophet
from multiprocessing import Pool, cpu_count

In [21]:
pd.read_csv('train.csv')

Unnamed: 0,date,store,item,sales
0,2013-01-01,1,1,13
1,2013-01-02,1,1,11
2,2013-01-03,1,1,14
3,2013-01-04,1,1,13
4,2013-01-05,1,1,10
...,...,...,...,...
912995,2017-12-27,10,50,63
912996,2017-12-28,10,50,59
912997,2017-12-29,10,50,74
912998,2017-12-30,10,50,62


In [None]:
df_train = (pd.read_csv('train.csv')
              .rename(columns={'date':'ds', 'sales':'y'}))

In [None]:
df_train.columns

In [None]:
def rnd_timeseries(store, item):
    return df_train[(df_train.store==store)&(df_train.item==item)][['ds', 'y']]

In [None]:
ls_store_item = list(set(zip(df_train.store, df_train.item)))

In [None]:
%%time
series = [rnd_timeseries(store, item) for store, item in ls_store_item]

In [None]:
def run_prophet(history_pd):
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    # fit the model
    model.fit(history_pd)

    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=90,
        freq='d',
        include_history=True
    )

    # make predictions
    results_pd = model.predict(future_pd)
    return results_pd

In [None]:
%%timeit
f = run_prophet(series[0])

## 1 hr 20 Mins : MyCom= 11.82s, 12m 23.92s

In [None]:
start_time = time.time()
result = list(map(lambda history_pd: run_prophet(history_pd), tqdm(series)))
print("--- %s seconds ---" % (time.time() - start_time))

## 44 Mins using multiprocessing : 12m 44.41s

In [None]:
start_time = time.time()
p = Pool(cpu_count())
predictions = list(tqdm(p.imap(run_prophet, series), total=len(series)))
p.close()
p.join()
print("--- %s seconds ---" % (time.time() - start_time))

## 35 mins: With concurrent futures : 12m 45.42s

In [None]:
import concurrent.futures

In [None]:
%%time
with concurrent.futures.ProcessPoolExecutor() as executor:
    output = list(tqdm(executor.map(run_prophet, series), total=len(series)))

In [None]:
len(output)

In [None]:
output

## 15 Mins: Using Ray : 39s

In [None]:
import ray
import logging


ray.init(log_to_driver=False)

@ray.remote
def run_prophet(history_pd):
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    # fit the model
    model.fit(history_pd)

    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=90,
        freq='d',
        include_history=True
    )

    # make predictions
    results_pd = model.predict(future_pd)
    return results_pd

In [None]:
%%time
futures = ray.get([run_prophet.remote(i) for i in series])

In [None]:
len(futures[498])

In [None]:
futures