Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel cross validation (python) #1428

Closed
TomAugspurger opened this issue Apr 10, 2020 · 4 comments
Closed

Parallel cross validation (python) #1428

TomAugspurger opened this issue Apr 10, 2020 · 4 comments
Labels

Comments

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Apr 10, 2020

Hi there 👋

I'm opening this issue mainly for discussion. I've written an example (rendered, pull request) that shows how to parallelize prophet.diagnostics.cross_validation on a Dask cluster. Things work really well, but there's a bit that could be improved. The full details are available in that example, but the basic issue is that you need a bit of specialized Prophet knowledge and a bit of specialized Dask knowledge to get them working well together.

Currently, cross_validation accepts a multiprocess=True/False flag (#1250). Rather than a flag, it'd be nice if users could provide some object implementing some API. Python's concurrent.futures API is the obvious one. Then users could come along with an object implement that API

from concurrent.futures import ThreadPoolExecutor


# Parallelize with Threads
cross_validation(model, executor=ThreadPoolExecutor())

# Parallelize with Dask
cross_validation(model, executor=dask.distributed.Client())

and things would just work. Unfortunately that doesn't quite work today, because Python's concurrent.futures.as_completed and friends only work well with concurrent.futures.Future objects. They don't really work well with Dask's (or any other library's) Futures. See dask/distributed#3695. There's a chance this will be improved in Python 3.9, but that's unclear.

So my questions for now is, Is there any appetite for opt-in Dask support? Meaning something like a use_dask=True keyword? And if the user provides that we can safely import Dask's as_completed? Things would look roughly like

diff --git a/python/fbprophet/diagnostics.py b/python/fbprophet/diagnostics.py
index 313d086..67d6ba7 100644
--- a/python/fbprophet/diagnostics.py
+++ b/python/fbprophet/diagnostics.py
@@ -59,7 +59,8 @@ def generate_cutoffs(df, horizon, initial, period):
     return list(reversed(result))
 
 
-def cross_validation(model, horizon, period=None, initial=None, multiprocess=False, cutoffs=None):
+def cross_validation(model, horizon, period=None, initial=None, multiprocess=False, cutoffs=None,
+                     executor=None, use_dask=True):
     """Cross-Validation for time series.
 
     Computes forecasts from historical cutoff points, which user can input.
@@ -132,6 +133,13 @@ def cross_validation(model, horizon, period=None, initial=None, multiprocess=Fal
             logger.info('Running cross validation in multiprocessing mode')
             input_df = ((df, model, cutoff, horizon, predict_columns) for cutoff in cutoffs)
             predicts = pool.starmap(single_cutoff_forecast, input_df)
+    elif use_dask:
+        from dask.distributed import as_completed
+        futures = [executor.submit(single_cutoff_forecast, df, model, cutoff, horizon, predict_columns)
+                   for cutoff in cutoffs]
+        # TODO: probably need to reorder..., but ignore for now
+        predicts = as_completed(futures)
+
     else:
         predicts = [
             single_cutoff_forecast(df, model, cutoff, horizon, predict_columns)

If there's some appetite for that I'd be happy to work on it.

@bletham
Copy link
Contributor

bletham commented Apr 13, 2020

Support for dask parallelization seems reasonable to me, though seeing here three blocks of code that are all calling the same function with the same data in slightly different ways I wonder if it might make sense to abstract things one step further. Something like this:

def cross_validation(
    model, horizon, period=None, initial=None, multiprocess=False, cutoffs=None, 
    forecast_cutoffs_fn=None
):
    ...
    if forecast_cutoffs_fn is None:
        forecast_cutoffs_fn = multiprocessing_forecasts if multiprocess else serial_forecasts
    predicts = forecast_cutoffs_fn(cutoffs, df, model, horizon, predict_columns)

    # Combine all predicted pd.DataFrame into one pd.DataFrame
    return pd.concat(predicts, axis=0).reset_index(drop=True)


def serial_forecasts(cutoffs, df, model, horizon, predict_columns):
    return [
        single_cutoff_forecast(df, model, cutoff, horizon, predict_columns)
        for cutoff in tqdm(cutoffs)
    ]


def multiprocessing_forecasts(cutoffs, df, model, horizon, predict_columns):
    with Pool() as pool:
        logger.info('Running cross validation in multiprocessing mode')
        input_df = ((df, model, cutoff, horizon, predict_columns) for cutoff in cutoffs)
        predicts = pool.starmap(single_cutoff_forecast, input_df)
    return predicts

That seems like it would enable the use of any parallelization method desired without, I don't think, adding much additional complexity to the code, and the dask function would easily fit this API. What do you think?

@TomAugspurger
Copy link
Contributor Author

Agreed. I'll try to make a pull request later in the week along those lines.

Has there been a Prophet release with the multiprocess keyword? It seems like it would be redundant with specifying forecast_cutoffs_fn. Do you have a preference for users selecting the parallelism with a callable forecast_cutoffs_fn, or would accepting a string like "processes", "threads", "dask" (perhaps as a shorthand for a callable) also be OK?

@bletham
Copy link
Contributor

bletham commented Apr 23, 2020

No, there hasn't been a push with the multiprocess kwarg so we could refactor that without having to worry about backwards compatibility.

I do think it'd be good to allow for callables so that people in the future could try new functions without having to push them into the package first, but for the built-in ones having a string alias seems good.

@bletham bletham added the ready label Apr 28, 2020
@TomAugspurger
Copy link
Contributor Author

Closed by #1434

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants