In [1]:
import pandas as pd
import numpy as np
import time
from tqdm import tqdm

from fbprophet import Prophet
from multiprocessing import Pool, cpu_count

Importing plotly failed. Interactive plots will not work.


In [2]:
df_train = (pd.read_csv('train.csv')
              .rename(columns={'date':'ds', 'sales':'y'}))

In [3]:
df_train.columns

Index(['ds', 'store', 'item', 'y'], dtype='object')

In [4]:
def rnd_timeseries(store, item):
    return df_train[(df_train.store==store)&(df_train.item==item)][['ds', 'y']]

In [5]:
ls_store_item = list(set(zip(df_train.store, df_train.item)))

In [6]:
%%time
series = [rnd_timeseries(store, item) for store, item in ls_store_item]

CPU times: user 3.14 s, sys: 141 ms, total: 3.28 s
Wall time: 3.3 s


In [7]:
def run_prophet(history_pd):
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    # fit the model
    model.fit(history_pd)

    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=90,
        freq='d',
        include_history=True
    )

    # make predictions
    results_pd = model.predict(future_pd)
    return results_pd

In [9]:
%%timeit
f = run_prophet(series[0])

INFO:numexpr.utils:NumExpr defaulting to 8 threads.


6.95 s ± 510 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## 1 hr 20 Mins

In [55]:
start_time = time.time()
result = list(map(lambda history_pd: run_prophet(history_pd), tqdm(series)))
print("--- %s seconds ---" % (time.time() - start_time))

100%|██████████| 500/500 [1:20:37<00:00,  9.68s/it]  

--- 4837.546568393707 seconds ---





## 44 Mins using multiprocessing

In [51]:
start_time = time.time()
p = Pool(cpu_count())
predictions = list(tqdm(p.imap(run_prophet, series), total=len(series)))
p.close()
p.join()
print("--- %s seconds ---" % (time.time() - start_time))

100%|██████████| 500/500 [44:30<00:00,  5.34s/it]  

--- 2673.760531425476 seconds ---





## 35 mins: With concurrent futures

In [10]:
import concurrent.futures

In [11]:
%%time
with concurrent.futures.ProcessPoolExecutor() as executor:
    output = list(tqdm(executor.map(run_prophet, series), total=len(series)))

100%|██████████| 500/500 [35:23<00:00,  4.25s/it] 

CPU times: user 4.67 s, sys: 2.75 s, total: 7.42 s
Wall time: 35min 25s





## 15 Mins: Using Ray 

In [8]:
import ray
import logging


ray.init(log_to_driver=False)

@ray.remote
def run_prophet(history_pd):
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    # fit the model
    model.fit(history_pd)

    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=90,
        freq='d',
        include_history=True
    )

    # make predictions
    results_pd = model.predict(future_pd)
    return results_pd

2020-08-25 09:59:26,255	INFO resource_spec.py:223 -- Starting Ray with 2.69 GiB memory available for workers and up to 1.35 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-08-25 09:59:27,112	INFO services.py:1191 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


In [9]:
%%time
futures = ray.get([run_prophet.remote(i) for i in series])

CPU times: user 8.97 s, sys: 4.33 s, total: 13.3 s
Wall time: 11min 39s


In [13]:
len(futures[498])

1916