Skip to content

Commit

Permalink
FEAT-modin-project#6838: Prefer lazy execution for binary operations.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
  • Loading branch information
AndreyPavlenko committed Jan 31, 2024
1 parent c130e13 commit 2c11fe9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
3 changes: 3 additions & 0 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def try_compute_new_dtypes(
class Binary(Operator):
"""Builder class for Binary operator."""

_binary_ops = set()

@classmethod
def register(
cls,
Expand Down Expand Up @@ -424,4 +426,5 @@ def caller(
new_modin_frame, shape_hint=shape_hint
)

cls._binary_ops.add(func)
return caller
21 changes: 20 additions & 1 deletion modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ray
from ray.util.client.common import ClientObjectRef

from modin.core.dataframe.algebra import Binary
from modin.error_message import ErrorMessage


Expand Down Expand Up @@ -53,6 +54,7 @@ class RayWrapper:
"""Mixin that provides means of running functions remotely and getting local results."""

_func_cache = {}
_lazy_op_refs = set()

@classmethod
def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
Expand Down Expand Up @@ -138,9 +140,11 @@ def put(cls, data, **kwargs):
if "<locals>" not in qname and "<lambda>" not in qname:
ref = cls._func_cache.get(data, None)
if ref is None:
ref = ray.put(data)
if len(cls._func_cache) < 1024:
ref = ray.put(data)
cls._func_cache[data] = ref
if data in Binary._binary_ops:
cls._lazy_op_refs.add(ref)
else:
msg = "To many functions in the RayWrapper cache!"
assert "MODIN_GITHUB_CI" not in os.environ, msg
Expand Down Expand Up @@ -168,6 +172,21 @@ def wait(cls, obj_ids, num_returns=None):
num_returns = len(unique_ids)
ray.wait(unique_ids, num_returns=num_returns)

@classmethod
def prefer_lazy(cls, op_ref):
"""
Return True if lazy execution is preferable for the specified operation.
Parameters
----------
op_ref : ray.ObjectID
Returns
-------
bool
"""
return op_ref in cls._lazy_op_refs


@ray.remote
class SignalActor: # pragma: no cover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def apply(self, func, *args, **kwargs):
It does not matter if `func` is callable or an ``ray.ObjectRef``. Ray will
handle it correctly either way. The keyword arguments are sent as a dictionary.
"""
if self.execution_wrapper.prefer_lazy(func):
return self.add_to_apply_calls(func, *args, **kwargs)

log = get_logger()
self._is_debug(log) and log.debug(f"ENTER::Partition.apply::{self._identity}")
data = self._data
Expand Down

0 comments on commit 2c11fe9

Please sign in to comment.