Skip to content

Commit

Permalink
FEAT-modin-project#7202: Use custom resources for Ray
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
  • Loading branch information
YarShev committed Apr 19, 2024
1 parent 177c912 commit aa839bd
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 22 deletions.
2 changes: 2 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
ProgressBar,
RangePartitioning,
RangePartitioningGroupby,
RayCustomResources,
RayRedisAddress,
RayRedisPassword,
ReadSqlEngine,
Expand Down Expand Up @@ -75,6 +76,7 @@
"IsRayCluster",
"RayRedisAddress",
"RayRedisPassword",
"RayCustomResources",
"LazyExecution",
# Dask specific
"DaskThreadsPerWorker",
Expand Down
12 changes: 12 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,18 @@ class RayRedisPassword(EnvironmentVariable, type=ExactStr):
default = secrets.token_hex(32)


class RayCustomResources(EnvironmentVariable, type=dict):
"""
Ray node's custom resources to request them in tasks or actors.
Visit Ray documentation for more details:
https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources
"""

varname = "MODIN_RAY_CUSTOM_RESOURCES"
default = None


class CpuCount(EnvironmentVariable, type=int):
"""How many CPU cores to use during initialization of the Modin engine."""

Expand Down
17 changes: 10 additions & 7 deletions modin/core/execution/ray/common/deferred_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ray._private.services import get_node_ip_address
from ray.util.client.common import ClientObjectRef

from modin.config import RayCustomResources
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.logging import get_logger

Expand Down Expand Up @@ -155,9 +156,9 @@ def exec(
and self.flat_kwargs
and self.num_returns == 1
):
result, length, width, ip = remote_exec_func.remote(
self.func, self.data, *self.args, **self.kwargs
)
result, length, width, ip = remote_exec_func.options(
resources=RayCustomResources.get()
).remote(self.func, self.data, *self.args, **self.kwargs)
meta = MetaList([length, width, ip])
self._set_result(result, meta, 0)
return result, meta, 0
Expand Down Expand Up @@ -435,11 +436,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]:
# Prefer _remote_exec_single_chain(). It has fewer arguments and
# does not require the num_returns to be specified in options.
if num_returns == 2:
return _remote_exec_single_chain.remote(*args)
return _remote_exec_single_chain.options(
resources=RayCustomResources.get()
).remote(*args)
else:
return _remote_exec_multi_chain.options(num_returns=num_returns).remote(
num_returns, *args
)
return _remote_exec_multi_chain.options(
num_returns=num_returns, resources=RayCustomResources.get()
).remote(num_returns, *args)

def _set_result(
self,
Expand Down
7 changes: 4 additions & 3 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ray
from ray.util.client.common import ClientObjectRef

from modin.config import RayCustomResources
from modin.error_message import ErrorMessage


Expand Down Expand Up @@ -78,9 +79,9 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
"""
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_ray_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
)
return _deploy_ray_func.options(
num_returns=num_returns, resources=RayCustomResources.get()
).remote(func, *args, **kwargs)

@classmethod
def is_future(cls, item):
Expand Down
2 changes: 2 additions & 0 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
IsRayCluster,
Memory,
NPartitions,
RayCustomResources,
RayRedisAddress,
RayRedisPassword,
StorageFormat,
Expand Down Expand Up @@ -126,6 +127,7 @@ def initialize_ray(
"object_store_memory": object_store_memory,
"_redis_password": redis_password,
"_memory": object_store_memory,
"resources": RayCustomResources.get(),
**extra_init_kw,
}
# It should be enough to simply set the required variables for the main process
Expand Down
21 changes: 17 additions & 4 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs

from modin.config import RayCustomResources
from modin.core.execution.ray.common import RayWrapper, SignalActor
from modin.core.execution.ray.generic.io import RayIO
from modin.core.io import (
Expand Down Expand Up @@ -188,7 +189,9 @@ def to_csv(cls, qc, **kwargs):
if not cls._to_csv_check_support(kwargs):
return RayIO.to_csv(qc, **kwargs)

signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1)
signals = SignalActor.options(resources=RayCustomResources.get()).remote(
len(qc._modin_frame._partitions) + 1
)

def func(df, **kw): # pragma: no cover
"""
Expand Down Expand Up @@ -225,7 +228,11 @@ def func(df, **kw): # pragma: no cover
csv_kwargs["path_or_buf"].close()

# each process waits for its turn to write to a file
RayWrapper.materialize(signals.wait.remote(partition_idx))
RayWrapper.materialize(
signals.wait.options(resources=RayCustomResources.get()).remote(
partition_idx
)
)

# preparing to write data from the buffer to a file
with get_handle(
Expand All @@ -242,12 +249,18 @@ def func(df, **kw): # pragma: no cover
handles.handle.write(content)

# signal that the next process can start writing to the file
RayWrapper.materialize(signals.send.remote(partition_idx + 1))
RayWrapper.materialize(
signals.send.options(resources=RayCustomResources.get()).remote(
partition_idx + 1
)
)
# used for synchronization purposes
return pandas.DataFrame()

# signaling that the partition with id==0 can be written to the file
RayWrapper.materialize(signals.send.remote(0))
RayWrapper.materialize(
signals.send.options(resources=RayCustomResources.get()).remote(0)
)
# Ensure that the metadata is syncrhonized
qc._modin_frame._propagate_index_objs(axis=None)
result = qc._modin_frame._partition_mgr_cls.map_axis_partitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if TYPE_CHECKING:
from ray.util.client.common import ClientObjectRef

from modin.config import LazyExecution
from modin.config import LazyExecution, RayCustomResources
from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.core.execution.ray.common.deferred_execution import (
Expand Down Expand Up @@ -270,9 +270,9 @@ def length(self, materialize=True):
if (length := self._length_cache) is None:
self.drain_call_queue()
if (length := self._length_cache) is None:
length, self._width_cache = _get_index_and_columns.remote(
self._data_ref
)
length, self._width_cache = _get_index_and_columns.options(
resources=RayCustomResources.get()
).remote(self._data_ref)
self._length_cache = length
if materialize and isinstance(length, ObjectIDType):
self._length_cache = length = RayWrapper.materialize(length)
Expand All @@ -297,9 +297,9 @@ def width(self, materialize=True):
if (width := self._width_cache) is None:
self.drain_call_queue()
if (width := self._width_cache) is None:
self._length_cache, width = _get_index_and_columns.remote(
self._data_ref
)
self._length_cache, width = _get_index_and_columns.options(
resources=RayCustomResources.get()
).remote(self._data_ref)
self._width_cache = width
if materialize and isinstance(width, ObjectIDType):
self._width_cache = width = RayWrapper.materialize(width)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ray
from ray.util import get_node_ip_address

from modin.config import RayCustomResources
from modin.core.dataframe.pandas.partitioning.axis_partition import (
PandasDataframeAxisPartition,
)
Expand Down Expand Up @@ -115,6 +116,7 @@ def deploy_splitting_func(
if extract_metadata
else num_splits
),
resources=RayCustomResources.get(),
).remote(
cls._get_deploy_split_func(),
*f_args,
Expand Down Expand Up @@ -180,6 +182,7 @@ def deploy_axis_func(
num_returns=(num_splits if lengths is None else len(lengths))
* (1 + cls._PARTITIONS_METADATA_LEN),
**({"max_retries": max_retries} if max_retries is not None else {}),
resources=RayCustomResources.get(),
).remote(
cls._get_deploy_axis_func(),
*f_args,
Expand Down Expand Up @@ -240,7 +243,8 @@ def deploy_func_between_two_axis_partitions(
A list of ``ray.ObjectRef``-s.
"""
return _deploy_ray_func.options(
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN)
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN),
resources=RayCustomResources.get(),
).remote(
PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions,
*f_args,
Expand Down

0 comments on commit aa839bd

Please sign in to comment.