Skip to content

Commit

Permalink
FEAT-modin-project#5809: New implementation of the Ray lazy execution…
Browse files Browse the repository at this point in the history
… queue

Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
  • Loading branch information
AndreyPavlenko committed Dec 6, 2023
1 parent 6843954 commit acb2dc3
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 394 deletions.
108 changes: 0 additions & 108 deletions modin/core/execution/ray/common/utils.py
Expand Up @@ -286,111 +286,3 @@ def deserialize(obj): # pragma: no cover
return dict(zip(obj.keys(), RayWrapper.materialize(list(obj.values()))))
else:
return obj


def deconstruct_call_queue(call_queue):
"""
Deconstruct the passed call queue into a 1D list.
This is required, so the call queue can be then passed to a Ray's kernel
as a variable-length argument ``kernel(*queue)`` so the Ray engine
automatically materialize all the futures that the queue might have contained.
Parameters
----------
call_queue : list[list[func, args, kwargs], ...]
Returns
-------
num_funcs : int
The number of functions in the call queue.
arg_lengths : list of ints
The number of positional arguments for each function in the call queue.
kw_key_lengths : list of ints
The number of key-word arguments for each function in the call queue.
kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool}
Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]`
describes the j-th keyword argument of the i-th function in the call queue.
The describtion contains of the lengths of the argument and whether it's a list at all
(for example, {"len": 1, "was_iterable": False} describes a non-list argument).
unfolded_queue : list
A 1D call queue that can be reconstructed using ``reconstruct_call_queue`` function.
"""
num_funcs = len(call_queue)
arg_lengths = []
kw_key_lengths = []
kw_value_lengths = []
unfolded_queue = []
for call in call_queue:
unfolded_queue.append(call[0])
unfolded_queue.extend(call[1])
arg_lengths.append(len(call[1]))
# unfold keyword dict
## unfold keys
unfolded_queue.extend(call[2].keys())
kw_key_lengths.append(len(call[2]))
## unfold values
value_lengths = []
for value in call[2].values():
was_iterable = True
if not isinstance(value, (list, tuple)):
was_iterable = False
value = (value,)
unfolded_queue.extend(value)
value_lengths.append({"len": len(value), "was_iterable": was_iterable})
kw_value_lengths.append(value_lengths)

return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *unfolded_queue


def reconstruct_call_queue(
num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue
):
"""
Reconstruct original call queue from the result of the ``deconstruct_call_queue()``.
Parameters
----------
num_funcs : int
The number of functions in the call queue.
arg_lengths : list of ints
The number of positional arguments for each function in the call queue.
kw_key_lengths : list of ints
The number of key-word arguments for each function in the call queue.
kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool}
Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]`
describes the j-th keyword argument of the i-th function in the call queue.
The describtion contains of the lengths of the argument and whether it's a list at all
(for example, {"len": 1, "was_iterable": False} describes a non-list argument).
unfolded_queue : list
A 1D call queue that is result of the ``deconstruct_call_queue()`` function.
Returns
-------
list[list[func, args, kwargs], ...]
Original call queue.
"""
items_took = 0

def take_n_items(n):
nonlocal items_took
res = unfolded_queue[items_took : items_took + n]
items_took += n
return res

call_queue = []
for i in range(num_funcs):
func = take_n_items(1)[0]
args = take_n_items(arg_lengths[i])
kw_keys = take_n_items(kw_key_lengths[i])
kwargs = {}
value_lengths = kw_value_lengths[i]
for key, value_length in zip(kw_keys, value_lengths):
vals = take_n_items(value_length["len"])
if value_length["len"] == 1 and not value_length["was_iterable"]:
vals = vals[0]
kwargs[key] = vals

call_queue.append((func, args, kwargs))

return call_queue

0 comments on commit acb2dc3

Please sign in to comment.