Skip to content

Commit

Permalink
Adding Ray Backend (#1049)
Browse files Browse the repository at this point in the history
* Initial commit. adding working ray backend but still not passing every tests

* adding runtime

* FIxing delete bug and cleaning tests

* add working_dir

* adding ray to setup.py

* fixing import, config and some code

* adding docs, and fixing some changes from review

* fixing pretest errors

* removing unused line

---------

Co-authored-by: Setepenre <pierre.delaunay.tr@gmail.com>
  • Loading branch information
Simnol22 and Delaunay committed Aug 17, 2023
1 parent c7a38ed commit 083dc38
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/src/code/executor/ray.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Ray Executor
=============

.. automodule:: orion.executor.ray_backend
:members:
7 changes: 7 additions & 0 deletions docs/src/user/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,10 @@ For more control over Dask, you should prefer using Dask executor backend direct
The executor configuration is used to create the Dask Client. See Dask's documentation
`here <https://distributed.dask.org/en/latest/api.html#distributed.Client>`__ for
more information on possible arguments.

Ray
----

We can also use the ray executor backend. For more control with ray, you can see ray's
documentation `here <https://docs.ray.io/en/latest/ray-core/package-ref.html#python-api>`__ for
more information on ray and it's python api.
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"sphinx_gallery",
],
"dask": ["dask[complete]"],
"ray": ["ray"],
"track": ["track @ git+https://github.com/Delaunay/track@master#egg=track"],
"profet": ["emukit", "GPy", "torch", "pybnn"],
"configspace": ["ConfigSpace"],
Expand Down Expand Up @@ -145,6 +146,7 @@
"joblib = orion.executor.joblib_backend:Joblib",
"poolexecutor = orion.executor.multiprocess_backend:PoolExecutor",
"dask = orion.executor.dask_backend:Dask",
"ray = orion.executor.ray_backend:Ray",
],
},
install_requires=[
Expand Down
113 changes: 113 additions & 0 deletions src/orion/executor/ray_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import logging
import traceback

from orion.core.utils.module_import import ImportOptional
from orion.executor.base import (
AsyncException,
AsyncResult,
BaseExecutor,
ExecutorClosed,
Future,
)

with ImportOptional("ray") as import_optional:
import ray

HAS_RAY = not import_optional.failed

logger = logging.getLogger(__name__)


class _Future(Future):
def __init__(self, future):
self.future = future
self.exception = None

def get(self, timeout=None):
if self.exception:
raise self.exception
try:
return ray.get(self.future, timeout=timeout)
except ray.exceptions.GetTimeoutError as e:
raise TimeoutError() from e

def wait(self, timeout=None):
try:
ray.get(self.future, timeout=timeout)
except ray.exceptions.GetTimeoutError:
pass
except Exception as e:
self.exception = e

def ready(self):
obj_ready = ray.wait([self.future])
return len(obj_ready[0]) == 1

def successful(self):
# Python 3.6 raise assertion error
if not self.ready():
raise ValueError()

return self.future.successful()


class Ray(BaseExecutor):
def __init__(
self,
n_workers=-1,
**config,
):
super().__init__(n_workers=n_workers)
self.initialized = False
if not HAS_RAY:
raise ImportError("Ray must be installed to use Ray executor.")
self.config = config

if not ray.is_initialized():
ray.init(**self.config)
self.initialized = True
logger.debug("Ray was initiated with runtime_env : %s", **config)

def close(self):
if self.initialized:
self.initialized = False
ray.shutdown()

def __del__(self):
self.close()

def __enter__(self):
return self

def submit(self, function, *args, **kwargs):
if not ray.is_initialized():
raise ExecutorClosed()

remote_g = ray.remote(function)
return _Future(remote_g.remote(*args, **kwargs))

def wait(self, futures):
return [future.get() for future in futures]

def async_get(self, futures, timeout=None):
results = []
tobe_deleted = []
for i, future in enumerate(futures):
if timeout and i == 0:
future.wait(timeout)

if future.ready():
try:
results.append(AsyncResult(future, future.get()))
except Exception as err:
results.append(AsyncException(future, err, traceback.format_exc()))

tobe_deleted.append(future)
for future in tobe_deleted:
futures.remove(future)

return results

def __exit__(self, exc_type, exc_value, traceback):
self.close()
super().__exit__(exc_type, exc_value, traceback)
34 changes: 32 additions & 2 deletions tests/unittests/executor/test_executor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import time

import pytest

from orion.executor.base import AsyncException, ExecutorClosed, executor_factory
from orion.executor.dask_backend import HAS_DASK, Dask
from orion.executor.multiprocess_backend import PoolExecutor
from orion.executor.ray_backend import HAS_RAY, Ray
from orion.executor.single_backend import SingleExecutor


Expand All @@ -16,6 +18,11 @@ def thread(n):
return PoolExecutor(n, "threading")


def ray(n):
test_working_dir = os.path.dirname(os.path.abspath(__file__))
return Ray(n, runtime_env={"working_dir": test_working_dir})


def skip_dask_if_not_installed(
value, reason="Dask dependency is required for these tests."
):
Expand All @@ -39,18 +46,43 @@ def xfail_dask_if_not_installed(
)


def skip_ray_if_not_installed(
value, reason="Ray dependency is required for these tests."
):
return pytest.param(
value,
marks=pytest.mark.skipif(
not HAS_RAY,
reason=reason,
),
)


def xfail_ray_if_not_installed(
value, reason="Ray dependency is required for these tests."
):
return pytest.param(
value,
marks=pytest.mark.xfail(
condition=not HAS_RAY, reason=reason, raises=ImportError
),
)


executors = [
"joblib",
"poolexecutor",
"singleexecutor",
skip_dask_if_not_installed("dask"),
skip_ray_if_not_installed("ray"),
]

backends = [
thread,
multiprocess,
SingleExecutor,
skip_dask_if_not_installed(Dask),
skip_ray_if_not_installed(ray),
]


Expand Down Expand Up @@ -191,9 +223,7 @@ def test_execute_async_bad(backend):

def nested_jobs(executor):
with executor:
print("nested_jobs sub")
futures = [executor.submit(function, 1, 2, i) for i in range(10)]
print("nested_jobs wait")
all_results = executor.wait(futures)
return sum(all_results)

Expand Down

0 comments on commit 083dc38

Please sign in to comment.