Skip to content

Commit

Permalink
FEAT-modin-project#7001: Do not force materialization in MetaList.__g…
Browse files Browse the repository at this point in the history
…etitem__()

Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
  • Loading branch information
AndreyPavlenko committed Mar 4, 2024
1 parent cd3d0c6 commit 4828c74
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 13 deletions.
3 changes: 2 additions & 1 deletion modin/core/execution/ray/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

"""Common utilities for Ray execution engine."""

from .engine_wrapper import RayWrapper, SignalActor
from .engine_wrapper import ObjectRefMapper, RayWrapper, SignalActor
from .utils import initialize_ray

__all__ = [
"initialize_ray",
"RayWrapper",
"ObjectRefMapper",
"SignalActor",
]
47 changes: 43 additions & 4 deletions modin/core/execution/ray/common/deferred_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from ray._private.services import get_node_ip_address
from ray.util.client.common import ClientObjectRef

from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.common import ObjectRefMapper, RayWrapper
from modin.logging import get_logger

ObjectRefType = Union[ray.ObjectRef, ClientObjectRef, None]
Expand Down Expand Up @@ -491,9 +491,7 @@ def __getitem__(self, index):
Any
"""
obj = self._obj
if not isinstance(obj, list):
self._obj = obj = RayWrapper.materialize(obj)
return obj[index]
return obj[index] if isinstance(obj, list) else _ListRefMapper(self, index)

def __setitem__(self, index, value):
"""
Expand All @@ -510,6 +508,47 @@ def __setitem__(self, index, value):
obj[index] = value


class _ListRefMapper(ObjectRefMapper):
"""
Used by MetaList.__getitem__() for lazy materialization.
Parameters
----------
meta : MetaList
idx : int
"""

def __init__(self, meta: MetaList, idx: int):
self.meta = meta
self.idx = idx

def get(self):
"""
Get item at self.idx or object ref if not materialized.
Returns
-------
object
"""
obj = self.meta._obj
return obj[self.idx] if isinstance(obj, list) else obj

def map(self, materialized):
"""
Save the materialized list in self.meta and get the item at self.idx.
Parameters
----------
materialized : list
Returns
-------
object
"""
self.meta._obj = materialized
return materialized[self.idx]


class _Tag(Enum): # noqa: PR01
"""
A set of special values used for the method arguments de/construction.
Expand Down
102 changes: 94 additions & 8 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import asyncio
import os
from types import FunctionType
from typing import Sequence

import ray
from ray.util.client.common import ClientObjectRef

from modin.error_message import ErrorMessage

RayObjectRefTypes = (ray.ObjectRef, ClientObjectRef)


@ray.remote
def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
Expand Down Expand Up @@ -96,8 +99,7 @@ def is_future(cls, item):
boolean
If the value is a future.
"""
ObjectIDType = (ray.ObjectRef, ClientObjectRef)
return isinstance(item, ObjectIDType)
return isinstance(item, RayObjectRefTypes)

@classmethod
def materialize(cls, obj_id):
Expand All @@ -114,7 +116,56 @@ def materialize(cls, obj_id):
object
Whatever was identified by `obj_id`.
"""
return ray.get(obj_id)
if isinstance(obj_id, ObjectRefMapper):
obj = obj_id.get()
return (
obj_id.map(ray.get(obj)) if isinstance(obj, RayObjectRefTypes) else obj
)

if not isinstance(obj_id, Sequence):
return ray.get(obj_id) if isinstance(obj_id, RayObjectRefTypes) else obj_id

if all(type(obj) in RayObjectRefTypes for obj in obj_id):
return ray.get(obj_id)

ids = {}
result = []
for obj in obj_id:
if isinstance(obj, RayObjectRefTypes):
if isinstance(obj, ObjectRefMapper):
oid = obj.get()
if isinstance(oid, RayObjectRefTypes):
mapper = obj
obj = oid
else:
result.append(oid)
continue
else:
mapper = None
else:
result.append(obj)
continue

idx = ids.get(obj, None)
if idx is None:
ids[obj] = idx = len(ids)
if mapper is None:
result.append(obj)
else:
mapper._materialized_idx = idx
result.append(mapper)

if len(ids) == 0:
return result

materialized = ray.get(list(ids.keys()))
for i in range(len(result)):
if isinstance((obj := result[i]), RayObjectRefTypes):
if isinstance(obj, ObjectRefMapper):
result[i] = obj.map(materialized[obj._materialized_idx])
else:
result[i] = materialized[ids[obj]]
return result

@classmethod
def put(cls, data, **kwargs):
Expand Down Expand Up @@ -161,12 +212,47 @@ def wait(cls, obj_ids, num_returns=None):
obj_ids : list, scalar
num_returns : int, optional
"""
if not isinstance(obj_ids, list):
ids = set()
if not isinstance(obj_ids, Sequence):
obj_ids = [obj_ids]
unique_ids = list(set(obj_ids))
if num_returns is None:
num_returns = len(unique_ids)
ray.wait(unique_ids, num_returns=num_returns)

ids = set()
for obj in obj_ids:
if isinstance(obj, ObjectRefMapper):
obj = obj.get()
if isinstance(obj, RayObjectRefTypes):
ids.add(obj)

if num_ids := len(ids):
ray.wait(list(ids), num_returns=num_returns or num_ids)


class ObjectRefMapper(ray.ObjectRef):
"""Map the materialized object to a different value."""

def get(self):
"""
Get an object reference or the cached, previously mapped value.
Returns
-------
ray.ObjectRef or object
"""
raise NotImplementedError()

def map(self, materialized):
"""
Map the materialized object.
Parameters
----------
materialized: object
Returns
-------
object
"""
raise NotImplementedError()


@ray.remote
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PandasOnRayDataframePartitionManager(GenericRayDataframePartitionManager):
_column_partitions_class = PandasOnRayDataframeColumnPartition
_row_partition_class = PandasOnRayDataframeRowPartition
_execution_wrapper = RayWrapper
materialize_futures = RayWrapper.materialize

@classmethod
def wait_partitions(cls, partitions):
Expand Down

0 comments on commit 4828c74

Please sign in to comment.