diff --git a/Makefile b/Makefile index 861c77ca..3801fe58 100644 --- a/Makefile +++ b/Makefile @@ -90,7 +90,7 @@ install-no-extras-for-test: python -m pip install .[test] install-all-extras-for-test: - python -m pip install .[all_extras,sktime-integration,test] + python -m pip install .[all_extras,test,test_parallel_backends,sktime-integration] install-editable: pip install -e . diff --git a/pyproject.toml b/pyproject.toml index 4da54201..573f718a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,11 @@ test = [ "torch", "tf_keras", ] +test_parallel_backends = [ + "dask", + "joblib", + 'ray >=2.40.0; python_version < "3.13"', +] all_extras = [ "hyperactive[integrations]", "optuna<5", diff --git a/src/hyperactive/opt/_common.py b/src/hyperactive/opt/_common.py new file mode 100644 index 00000000..37c01e3a --- /dev/null +++ b/src/hyperactive/opt/_common.py @@ -0,0 +1,16 @@ +"""Common functions used by multiple optimizers.""" + +__all__ = ["_score_params"] + + +def _score_params(params, meta): + """Score parameters, used in parallelization.""" + meta = meta.copy() + experiment = meta["experiment"] + error_score = meta["error_score"] + + try: + return experiment(**params) + except Exception: # noqa: B904 + # Catch all exceptions and assign error_score + return error_score diff --git a/src/hyperactive/opt/gridsearch/_sk.py b/src/hyperactive/opt/gridsearch/_sk.py index d9d6fd3c..d5060d65 100644 --- a/src/hyperactive/opt/gridsearch/_sk.py +++ b/src/hyperactive/opt/gridsearch/_sk.py @@ -7,6 +7,8 @@ from sklearn.model_selection import ParameterGrid from hyperactive.base import BaseOptimizer +from hyperactive.opt._common import _score_params +from hyperactive.utils.parallel import parallelize class GridSearchSk(BaseOptimizer): @@ -17,8 +19,45 @@ class GridSearchSk(BaseOptimizer): param_grid : dict[str, list] The search space to explore. A dictionary with parameter names as keys and a numpy array as values. + error_score : float, default=np.nan The score to assign if an error occurs during the evaluation of a parameter set. + + backend : {"dask", "loky", "multiprocessing", "threading", "ray"}, default = "None". + Parallelization backend to use in the search process. + + - "None": executes loop sequentally, simple list comprehension + - "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops + - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark`` + - "dask": uses ``dask``, requires ``dask`` package in environment + - "ray": uses ``ray``, requires ``ray`` package in environment + + backend_params : dict, optional + additional parameters passed to the backend as config. + Directly passed to ``utils.parallel.parallelize``. + Valid keys depend on the value of ``backend``: + + - "None": no additional parameters, ``backend_params`` is ignored + - "loky", "multiprocessing" and "threading": default ``joblib`` backends + any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, + with the exception of ``backend`` which is directly controlled by ``backend``. + If ``n_jobs`` is not passed, it will default to ``-1``, other parameters + will default to ``joblib`` defaults. + - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark``. + any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, + ``backend`` must be passed as a key of ``backend_params`` in this case. + If ``n_jobs`` is not passed, it will default to ``-1``, other parameters + will default to ``joblib`` defaults. + - "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler`` + + - "ray": The following keys can be passed: + + - "ray_remote_args": dictionary of valid keys for ``ray.init`` + - "shutdown_ray": bool, default=True; False prevents ``ray`` from shutting + down after parallelization. + - "logger_name": str, default="ray"; name of the logger to use. + - "mute_warnings": bool, default=False; if True, suppresses warnings + experiment : BaseExperiment, optional The experiment to optimize parameters for. Optional, can be passed later via ``set_params``. @@ -53,17 +92,29 @@ class GridSearchSk(BaseOptimizer): Best parameters can also be accessed via the attributes: >>> best_params = grid_search.best_params_ + + To parallelize the search, set the ``backend`` and ``backend_params``: + >>> grid_search = GridSearch( + ... param_grid, + ... backend="joblib", + ... backend_params={"n_jobs": -1}, + ... experiment=sklearn_exp, + ... ) """ def __init__( self, param_grid=None, error_score=np.nan, + backend="None", + backend_params=None, experiment=None, ): self.experiment = experiment self.param_grid = param_grid self.error_score = error_score + self.backend = backend + self.backend_params = backend_params super().__init__() @@ -91,19 +142,23 @@ def _check_param_grid(self, param_grid): "to be a non-empty sequence." ) - def _solve(self, experiment, param_grid, error_score): + def _solve(self, experiment, param_grid, error_score, backend, backend_params): """Run the optimization search process.""" self._check_param_grid(param_grid) candidate_params = list(ParameterGrid(param_grid)) - scores = [] - for candidate_param in candidate_params: - try: - score = experiment(**candidate_param) - except Exception: # noqa: B904 - # Catch all exceptions and assign error_score - score = error_score - scores.append(score) + meta = { + "experiment": experiment, + "error_score": error_score, + } + + scores = parallelize( + fun=_score_params, + iter=candidate_params, + meta=meta, + backend=backend, + backend_params=backend_params, + ) best_index = np.argmin(scores) best_params = candidate_params[best_index] @@ -170,4 +225,15 @@ def get_test_params(cls, parameter_set="default"): "param_grid": param_grid, } - return [params_sklearn, params_ackley] + params = [params_sklearn, params_ackley] + + from hyperactive.utils.parallel import _get_parallel_test_fixtures + + parallel_fixtures = _get_parallel_test_fixtures() + + for x in parallel_fixtures: + new_ackley = params_ackley.copy() + new_ackley.update(x) + params.append(new_ackley) + + return params diff --git a/src/hyperactive/opt/random_search.py b/src/hyperactive/opt/random_search.py index 4f59236b..18a7fb01 100644 --- a/src/hyperactive/opt/random_search.py +++ b/src/hyperactive/opt/random_search.py @@ -8,6 +8,8 @@ from sklearn.model_selection import ParameterSampler from hyperactive.base import BaseOptimizer +from hyperactive.opt._common import _score_params +from hyperactive.utils.parallel import parallelize class RandomSearchSk(BaseOptimizer): @@ -18,12 +20,51 @@ class RandomSearchSk(BaseOptimizer): param_distributions : dict[str, list | scipy.stats.rv_frozen] Search space specification. Discrete lists are sampled uniformly; scipy distribution objects are sampled via their ``rvs`` method. + n_iter : int, default=10 Number of parameter sets to evaluate. + random_state : int | np.random.RandomState | None, default=None Controls the pseudo-random generator for reproducibility. + error_score : float, default=np.nan Score assigned when the experiment raises an exception. + + backend : {"dask", "loky", "multiprocessing", "threading", "ray"}, default = "None". + Parallelization backend to use in the search process. + + - "None": executes loop sequentally, simple list comprehension + - "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops + - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark`` + - "dask": uses ``dask``, requires ``dask`` package in environment + - "ray": uses ``ray``, requires ``ray`` package in environment + + backend_params : dict, optional + additional parameters passed to the backend as config. + Directly passed to ``utils.parallel.parallelize``. + Valid keys depend on the value of ``backend``: + + - "None": no additional parameters, ``backend_params`` is ignored + - "loky", "multiprocessing" and "threading": default ``joblib`` backends + any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, + with the exception of ``backend`` which is directly controlled by ``backend``. + If ``n_jobs`` is not passed, it will default to ``-1``, other parameters + will default to ``joblib`` defaults. + - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark``. + any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, + ``backend`` must be passed as a key of ``backend_params`` in this case. + If ``n_jobs`` is not passed, it will default to ``-1``, other parameters + will default to ``joblib`` defaults. + - "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler`` + + - "ray": The following keys can be passed: + + - "ray_remote_args": dictionary of valid keys for ``ray.init`` + - "shutdown_ray": bool, default=True; False prevents ``ray`` from shutting + down after parallelization. + - "logger_name": str, default="ray"; name of the logger to use. + - "mute_warnings": bool, default=False; if True, suppresses warnings + experiment : BaseExperiment, optional Callable returning a scalar score when invoked with keyword arguments matching a parameter set. @@ -44,6 +85,8 @@ def __init__( n_iter=10, random_state=None, error_score=np.nan, + backend="None", + backend_params=None, experiment=None, ): self.experiment = experiment @@ -51,6 +94,8 @@ def __init__( self.n_iter = n_iter self.random_state = random_state self.error_score = error_score + self.backend = backend + self.backend_params = backend_params super().__init__() @@ -67,7 +112,7 @@ def _check_param_distributions(self, param_distributions): for p in param_distributions: for name, v in p.items(): if self._is_distribution(v): - # Assume scipy frozen distribution - nothing to check + # Assume scipy frozen distribution: nothing to check continue if isinstance(v, np.ndarray) and v.ndim > 1: @@ -93,6 +138,8 @@ def _solve( n_iter, random_state, error_score, + backend, + backend_params, ): """Sample ``n_iter`` points and return the best parameter set.""" self._check_param_distributions(param_distributions) @@ -104,13 +151,18 @@ def _solve( ) candidate_params = list(sampler) - scores: list[float] = [] - for candidate_param in candidate_params: - try: - score = experiment(**candidate_param) - except Exception: # noqa: B904 - score = error_score - scores.append(score) + meta = { + "experiment": experiment, + "error_score": error_score, + } + + scores = parallelize( + fun=_score_params, + iter=candidate_params, + meta=meta, + backend=backend, + backend_params=backend_params, + ) best_index = int(np.argmin(scores)) # lower-is-better convention best_params = candidate_params[best_index] @@ -154,4 +206,15 @@ def get_test_params(cls, parameter_set: str = "default"): "random_state": 0, } - return [params_sklearn, params_ackley] + params = [params_sklearn, params_ackley] + + from hyperactive.utils.parallel import _get_parallel_test_fixtures + + parallel_fixtures = _get_parallel_test_fixtures() + + for x in parallel_fixtures: + new_ackley = params_ackley.copy() + new_ackley.update(x) + params.append(new_ackley) + + return params diff --git a/src/hyperactive/utils/parallel.py b/src/hyperactive/utils/parallel.py new file mode 100644 index 00000000..3591c2a9 --- /dev/null +++ b/src/hyperactive/utils/parallel.py @@ -0,0 +1,289 @@ +# copied from sktime, BSD-3-Clause License (see LICENSE file) +# to be moved to scikit-base in the future +"""Common abstraction utilities for parallelization backends. + +New parallelization or iteration backends can be added easily as follows: + +* Add a new backend name to ``backend_dict``, syntax is + backend_name: backend_type, where backend_type collects backend options, + e.g., multiple options for a single parallelization backend. +* Add a new function to ``para_dict``, should have name + ``_parallelize_`` and take the same arguments as + ``_parallelize_none``. Ensure that ``backend`` and ``backend_params`` are arguments, + even if there is only one backend option, or no additional parameters. +* add the backend string in the docstring of parallelize, and any downstream + functions that use ``parallelize`` and expose the backend parameter an argument +""" + + +def parallelize(fun, iter, meta=None, backend=None, backend_params=None): + """Parallelize loop over iter via backend. + + Executes ``fun(x, meta=meta)`` in parallel for ``x`` in ``iter``, + and returns the results as a list in the same order as ``iter``. + + Uses the iteration or parallelization backend specified by ``backend``. + + Parameters + ---------- + fun : callable, must have exactly two arguments, second argument of name "meta" + function to be executed in parallel + + iter : iterable + iterable over which to parallelize, elements are passed to fun in order, + to the first argument + + meta : dict, optional + variables to be passed to fun, as the second argument, under the key ``meta`` + + backend : str, optional + backend to use for parallelization, one of + + - "None": executes loop sequentially, simple list comprehension + - "loky", "multiprocessing" and "threading": uses ``joblib`` ``Parallel`` loops + - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark`` + - "dask": uses ``dask``, requires ``dask`` package in environment + - "dask_lazy": same as ``"dask"``, but returns delayed object instead of list + - "ray": uses a ray remote to execute jobs in parallel + + backend_params : dict, optional + additional parameters passed to the backend as config. + Valid keys depend on the value of ``backend``: + + - "None": no additional parameters, ``backend_params`` is ignored + - "loky", "multiprocessing" and "threading": default ``joblib`` backends + any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, + with the exception of ``backend`` which is directly controlled by ``backend``. + If ``n_jobs`` is not passed, it will default to ``-1``, other parameters + will default to ``joblib`` defaults. + - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark``. + any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, + ``backend`` must be passed as a key of ``backend_params`` in this case. + If ``n_jobs`` is not passed, it will default to ``-1``, other parameters + will default to ``joblib`` defaults. + - "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler`` + + - "ray": The following keys can be passed: + + - "ray_remote_args": dictionary of valid keys for ``ray.init`` + - "shutdown_ray": bool, default=True; False prevents ``ray`` from shutting + down after parallelization. + - "logger_name": str, default="ray"; name of the logger to use. + - "mute_warnings": bool, default=False; if True, suppresses warnings + + """ + if meta is None: + meta = {} + if backend is None: + backend = "None" + if backend_params is None: + backend_params = {} + + backend_name = backend_dict[backend] + para_fun = para_dict[backend_name] + + ret = para_fun( + fun=fun, iter=iter, meta=meta, backend=backend, backend_params=backend_params + ) + return ret + + +backend_dict = { + "None": "none", + "loky": "joblib", + "multiprocessing": "joblib", + "threading": "joblib", + "joblib": "joblib", + "dask": "dask", + "dask_lazy": "dask", + "ray": "ray", +} +para_dict = {} + + +def _parallelize_none(fun, iter, meta, backend, backend_params): + """Execute loop via simple sequential list comprehension.""" + ret = [fun(x, meta=meta) for x in iter] + return ret + + +para_dict["none"] = _parallelize_none + + +def _parallelize_joblib(fun, iter, meta, backend, backend_params): + """Parallelize loop via joblib Parallel.""" + from joblib import Parallel, delayed + + par_params = backend_params.copy() + if "backend" not in par_params: + # if user selects custom joblib backend but does not specify backend explicitly, + # raise a ValueError + if backend == "joblib": + raise ValueError( + '"joblib" was selected as first layer parallelization backend, ' + "but no backend string was " + 'passed in the backend parameters dict, e.g., "spark". ' + "Please specify a backend to joblib as a key-value pair " + "in the backend_params arg or the backend:parallel:params config " + 'when using "joblib". ' + 'For clarity, "joblib" should only be used for two-layer ' + "backend dispatch, where the first layer is joblib, " + "and the second layer is a custom backend of joblib, e.g., spark. " + "For first-party joblib backends, please use the backend string " + 'of sktime directly, e.g., by specifying "multiprocessing" or "loky".' + ) + # in all other cases, we ensure the backend parameter is one of + # "loky", "multiprocessing" or "threading", as passed via backend + else: + par_params["backend"] = backend + elif backend != "joblib": + par_params["backend"] = backend + + if "n_jobs" not in par_params: + par_params["n_jobs"] = -1 + + ret = Parallel(**par_params)(delayed(fun)(x, meta=meta) for x in iter) + return ret + + +para_dict["joblib"] = _parallelize_joblib + + +def _parallelize_dask(fun, iter, meta, backend, backend_params): + """Parallelize loop via dask.""" + from dask import compute, delayed + + lazy = [delayed(fun)(x, meta=meta) for x in iter] + if backend == "dask": + return compute(*lazy, **backend_params) + else: + return lazy + + +para_dict["dask"] = _parallelize_dask + + +def _parallelize_ray(fun, iter, meta, backend, backend_params): + """Parallelize loop via ray.""" + import logging + import warnings + + import ray + + par_params = backend_params.copy() + + # read the possible additional keys + logger = logging.getLogger(par_params.get("logger_name", None)) + mute_warnings = par_params.get("mute_warnings", False) + shutdown_ray = par_params.get("shutdown_ray", True) + + if "ray_remote_args" not in par_params.keys(): + par_params["ray_remote_args"] = {} + + @ray.remote # pragma: no cover + def _ray_execute_function( + fun, params: dict, meta: dict, mute_warnings: bool = False + ): + if mute_warnings: + warnings.filterwarnings("ignore") # silence sktime warnings + assert ray.is_initialized() + result = fun(params, meta) + return result + + if not ray.is_initialized(): + logger.info("Starting Ray Parallel") + context = ray.init(**par_params["ray_remote_args"]) + logger.info( + f"Ray initialized. Open dashboard at http://{context.dashboard_url}" + ) + + # this is to keep the order of results while still using wait to optimize runtime + refs = [ + _ray_execute_function.remote(fun, x, meta, mute_warnings=mute_warnings) + for x in iter + ] + res_dict = dict.fromkeys(refs) + + unfinished = refs + while unfinished: + finished, unfinished = ray.wait(unfinished, num_returns=1) + res_dict[finished[0]] = ray.get(finished[0]) + + if shutdown_ray: + ray.shutdown() + + res = [res_dict[ref] for ref in refs] + return res + + +para_dict["ray"] = _parallelize_ray + + +# list of backends where we skip tests during CI +SKIP_FIXTURES = [ + "ray", # unstable, sporadic crashes in CI, see bug 8149 +] + + +def _get_parallel_test_fixtures(naming="estimator"): + """Return fixtures for parallelization tests. + + Returns a list of parameter fixtures, where each fixture + is a dict with keys "backend" and "backend_params". + + Parameters + ---------- + naming : str, optional + naming convention for the parameters, one of + + "estimator": for use in estimator constructors, + ``backend`` and ``backend_params`` + "config": for use in ``set_config``, + ``backend:parallel`` and ``backend:parallel:params`` + + Returns + ------- + fixtures : list of dict + list of backend parameter fixtures + keys depend on ``naming`` parameter, see above + either ``backend`` and ``backend_params`` (``naming="estimator"``), + or ``backend:parallel`` and ``backend:parallel:params`` (``naming="config"``) + values are backend strings and backend parameter dicts + only backends that are available in the environment are included + """ + from skbase.utils.dependencies import _check_soft_dependencies + + fixtures = [] + + # test no parallelization + fixtures.append({"backend": "None", "backend_params": {}}) + + # test joblib backends + for backend in ["loky", "multiprocessing", "threading"]: + fixtures.append({"backend": backend, "backend_params": {}}) + fixtures.append({"backend": backend, "backend_params": {"n_jobs": 2}}) + fixtures.append({"backend": backend, "backend_params": {"n_jobs": -1}}) + + # test dask backends + if _check_soft_dependencies("dask", severity="none"): + fixtures.append({"backend": "dask", "backend_params": {}}) + fixtures.append({"backend": "dask", "backend_params": {"scheduler": "sync"}}) + + # test ray backend + if _check_soft_dependencies("ray", severity="none"): + import os + + fixtures.append( + { + "backend": "ray", + "backend_params": { + "mute_warnings": True, + "ray_remote_args": {"num_cpus": os.cpu_count() - 1}, + }, + } + ) + + fixtures = [x for x in fixtures if x["backend"] not in SKIP_FIXTURES] + # remove backends in SKIP_FIXTURES from fixtures + + return fixtures diff --git a/src/hyperactive/utils/tests/__init__.py b/src/hyperactive/utils/tests/__init__.py new file mode 100644 index 00000000..874535a0 --- /dev/null +++ b/src/hyperactive/utils/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for utilities.""" diff --git a/src/hyperactive/utils/tests/test_parallel.py b/src/hyperactive/utils/tests/test_parallel.py new file mode 100644 index 00000000..618945d8 --- /dev/null +++ b/src/hyperactive/utils/tests/test_parallel.py @@ -0,0 +1,63 @@ +# copied from sktime, BSD-3-Clause License (see LICENSE file) +# to be moved to scikit-base in the future +"""Tests for parallelization utilities.""" + +import copy +import os + +import pytest +from skbase.utils.dependencies import _check_soft_dependencies + +from hyperactive.utils.parallel import _get_parallel_test_fixtures, parallelize + + +@pytest.mark.skipif( + not _check_soft_dependencies("ray", severity="none"), + reason="Execute tests for iff anything in the module has changed", +) +def test_ray_leaves_params_invariant(): + """Test that the parallelize function leaves backend_params invariant.""" + + def trial_function(params, meta): + return params + + backend = "ray" + backend_params = { + "mute_warnings": True, + "ray_remote_args": {"num_cpus": os.cpu_count() - 1}, + } + # copy for later comparison + backup = backend_params.copy() + + params = [1, 2, 3] + meta = {} + + parallelize(trial_function, params, meta, backend, backend_params) + + assert backup == backend_params + + +def square(x, **kwargs): + """Square function, for testing.""" + return x**2 + + +@pytest.mark.parametrize("fixture", _get_parallel_test_fixtures()) +def test_parallelize_simple_loop(fixture): + """Test that parallelize works with a simple function and fixture.""" + backend = fixture["backend"] + backend_params = copy.deepcopy(fixture["backend_params"]) + params_before = copy.deepcopy(fixture["backend_params"]) + + nums = range(8) + expected = [x**2 for x in nums] + + result = parallelize( + square, + nums, + backend=backend, + backend_params=backend_params, + ) + + assert list(result) == expected + assert backend_params == params_before