From 7aea988722f285e9fc5a67a817f664bd14ef47a5 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 4 Jun 2024 18:02:27 +0200 Subject: [PATCH] Improved errors and reduced logging for P2P RPC calls (#8666) --- distributed/shuffle/_comms.py | 15 +- distributed/shuffle/_core.py | 31 +++-- distributed/shuffle/_exceptions.py | 10 +- distributed/shuffle/_scheduler_plugin.py | 129 +++++++++++------- distributed/shuffle/_worker_plugin.py | 44 +++--- distributed/shuffle/tests/test_comm_buffer.py | 2 + distributed/shuffle/tests/test_shuffle.py | 26 ++-- distributed/shuffle/tests/utils.py | 2 +- 8 files changed, 164 insertions(+), 95 deletions(-) diff --git a/distributed/shuffle/_comms.py b/distributed/shuffle/_comms.py index 50094afddd..8886a1d757 100644 --- a/distributed/shuffle/_comms.py +++ b/distributed/shuffle/_comms.py @@ -5,10 +5,10 @@ from dask.utils import parse_bytes +from distributed.core import ErrorMessage, OKMessage, clean_exception from distributed.metrics import context_meter from distributed.shuffle._disk import ShardsBuffer from distributed.shuffle._limiter import ResourceLimiter -from distributed.utils import log_errors class CommShardsBuffer(ShardsBuffer): @@ -53,7 +53,9 @@ class CommShardsBuffer(ShardsBuffer): def __init__( self, - send: Callable[[str, list[tuple[Any, Any]]], Awaitable[None]], + send: Callable[ + [str, list[tuple[Any, Any]]], Awaitable[OKMessage | ErrorMessage] + ], memory_limiter: ResourceLimiter, concurrency_limit: int = 10, ): @@ -64,9 +66,14 @@ def __init__( ) self.send = send - @log_errors async def _process(self, address: str, shards: list[tuple[Any, Any]]) -> None: """Send one message off to a neighboring worker""" # Consider boosting total_size a bit here to account for duplication with context_meter.meter("send"): - await self.send(address, shards) + response = await self.send(address, shards) + status = response["status"] + if status == "error": + _, exc, tb = clean_exception(**response) + assert exc + raise exc.with_traceback(tb) + assert status == "OK" diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index f4f69266cc..43badfbf5e 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -29,13 +29,14 @@ from dask.typing import Key from dask.utils import parse_timedelta -from distributed.core import PooledRPCCall +from distributed.core import ErrorMessage, OKMessage, PooledRPCCall, error_message from distributed.exceptions import Reschedule from distributed.metrics import context_meter, thread_time from distributed.protocol import to_serialize +from distributed.protocol.serialize import ToPickle from distributed.shuffle._comms import CommShardsBuffer from distributed.shuffle._disk import DiskShardsBuffer -from distributed.shuffle._exceptions import ShuffleClosedError +from distributed.shuffle._exceptions import P2PConsistencyError, ShuffleClosedError from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._memory import MemoryShardsBuffer from distributed.utils import run_in_executor_with_context, sync @@ -59,6 +60,10 @@ _T = TypeVar("_T") +class RunSpecMessage(OKMessage): + run_spec: ShuffleRunSpec | ToPickle[ShuffleRunSpec] + + class ShuffleRun(Generic[_T_partition_id, _T_partition_type]): id: ShuffleId run_id: int @@ -199,7 +204,7 @@ async def barrier(self, run_ids: Sequence[int]) -> int: async def _send( self, address: str, shards: list[tuple[_T_partition_id, Any]] | bytes - ) -> None: + ) -> OKMessage | ErrorMessage: self.raise_if_closed() return await self.rpc(address).shuffle_receive( data=to_serialize(shards), @@ -209,7 +214,7 @@ async def _send( async def send( self, address: str, shards: list[tuple[_T_partition_id, Any]] - ) -> None: + ) -> OKMessage | ErrorMessage: if _mean_shard_size(shards) < 65536: # Don't send buffers individually over the tcp comms. # Instead, merge everything into an opaque bytes blob, send it all at once, @@ -220,7 +225,7 @@ async def send( else: shards_or_bytes = shards - def _send() -> Coroutine[Any, Any, None]: + def _send() -> Coroutine[Any, Any, OKMessage | ErrorMessage]: return self._send(address, shards_or_bytes) return await retry( @@ -302,11 +307,17 @@ def _read_from_disk(self, id: NDIndex) -> list[Any]: # TODO: Typing self.raise_if_closed() return self._disk_buffer.read("_".join(str(i) for i in id)) - async def receive(self, data: list[tuple[_T_partition_id, Any]] | bytes) -> None: - if isinstance(data, bytes): - # Unpack opaque blob. See send() - data = cast(list[tuple[_T_partition_id, Any]], pickle.loads(data)) - await self._receive(data) + async def receive( + self, data: list[tuple[_T_partition_id, Any]] | bytes + ) -> OKMessage | ErrorMessage: + try: + if isinstance(data, bytes): + # Unpack opaque blob. See send() + data = cast(list[tuple[_T_partition_id, Any]], pickle.loads(data)) + await self._receive(data) + return {"status": "OK"} + except P2PConsistencyError as e: + return error_message(e) async def _ensure_output_worker(self, i: _T_partition_id, key: Key) -> None: assigned_worker = self._get_assigned_worker(i) diff --git a/distributed/shuffle/_exceptions.py b/distributed/shuffle/_exceptions.py index 8031b8f399..27bdb8dd56 100644 --- a/distributed/shuffle/_exceptions.py +++ b/distributed/shuffle/_exceptions.py @@ -1,7 +1,15 @@ from __future__ import annotations -class ShuffleClosedError(RuntimeError): +class P2PIllegalStateError(RuntimeError): + pass + + +class P2PConsistencyError(RuntimeError): + pass + + +class ShuffleClosedError(P2PConsistencyError): pass diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index ef646bcea0..132f34387c 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -8,11 +8,13 @@ from dask.typing import Key +from distributed.core import ErrorMessage, OKMessage, error_message from distributed.diagnostics.plugin import SchedulerPlugin from distributed.metrics import time from distributed.protocol.pickle import dumps from distributed.protocol.serialize import ToPickle from distributed.shuffle._core import ( + RunSpecMessage, SchedulerShuffleState, ShuffleId, ShuffleRunSpec, @@ -20,6 +22,7 @@ barrier_key, id_from_key, ) +from distributed.shuffle._exceptions import P2PConsistencyError, P2PIllegalStateError from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin from distributed.utils import log_errors @@ -98,77 +101,97 @@ async def barrier(self, id: ShuffleId, run_id: int, consistent: bool) -> None: workers=list(shuffle.participating_workers), ) - def restrict_task(self, id: ShuffleId, run_id: int, key: Key, worker: str) -> dict: - shuffle = self.active_shuffles[id] - if shuffle.run_id > run_id: - return { - "status": "error", - "message": f"Request stale, expected {run_id=} for {shuffle}", - } - elif shuffle.run_id < run_id: - return { - "status": "error", - "message": f"Request invalid, expected {run_id=} for {shuffle}", - } - ts = self.scheduler.tasks[key] - self._set_restriction(ts, worker) - return {"status": "OK"} + def restrict_task( + self, id: ShuffleId, run_id: int, key: Key, worker: str + ) -> OKMessage | ErrorMessage: + try: + shuffle = self.active_shuffles[id] + if shuffle.run_id > run_id: + raise P2PConsistencyError( + f"Request stale, expected {run_id=} for {shuffle}" + ) + elif shuffle.run_id < run_id: + raise P2PConsistencyError( + f"Request invalid, expected {run_id=} for {shuffle}" + ) + ts = self.scheduler.tasks[key] + self._set_restriction(ts, worker) + return {"status": "OK"} + except P2PConsistencyError as e: + return error_message(e) def heartbeat(self, ws: WorkerState, data: dict) -> None: for shuffle_id, d in data.items(): if shuffle_id in self.shuffle_ids(): self.heartbeats[shuffle_id][ws.address].update(d) - def get(self, id: ShuffleId, worker: str) -> ToPickle[ShuffleRunSpec]: + def get(self, id: ShuffleId, worker: str) -> RunSpecMessage | ErrorMessage: + try: + try: + run_spec = self._get(id, worker) + return {"status": "OK", "run_spec": ToPickle(run_spec)} + except KeyError as e: + raise P2PConsistencyError( + f"No active shuffle with {id=!r} found" + ) from e + except P2PConsistencyError as e: + return error_message(e) + + def _get(self, id: ShuffleId, worker: str) -> ShuffleRunSpec: if worker not in self.scheduler.workers: # This should never happen - raise RuntimeError( + raise P2PConsistencyError( f"Scheduler is unaware of this worker {worker!r}" ) # pragma: nocover state = self.active_shuffles[id] state.participating_workers.add(worker) - return ToPickle(state.run_spec) + return state.run_spec + + def _create(self, spec: ShuffleSpec, key: Key, worker: str) -> ShuffleRunSpec: + # FIXME: The current implementation relies on the barrier task to be + # known by its name. If the name has been mangled, we cannot guarantee + # that the shuffle works as intended and should fail instead. + self._raise_if_barrier_unknown(spec.id) + self._raise_if_task_not_processing(key) + worker_for = self._calculate_worker_for(spec) + self._ensure_output_tasks_are_non_rootish(spec) + state = spec.create_new_run( + worker_for=worker_for, span_id=self.scheduler.tasks[key].group.span_id + ) + self.active_shuffles[spec.id] = state + self._shuffles[spec.id].add(state) + state.participating_workers.add(worker) + logger.warning( + "Shuffle %s initialized by task %r executed on worker %s", + spec.id, + key, + worker, + ) + return state.run_spec def get_or_create( self, - # FIXME: This should never be ToPickle[ShuffleSpec] - spec: ShuffleSpec | ToPickle[ShuffleSpec], + spec: ShuffleSpec, key: Key, worker: str, - ) -> ToPickle[ShuffleRunSpec]: - # FIXME: Sometimes, this doesn't actually get pickled - if isinstance(spec, ToPickle): - spec = spec.data + ) -> RunSpecMessage | ErrorMessage: try: - return self.get(spec.id, worker) + run_spec = self._get(spec.id, worker) + except P2PConsistencyError as e: + return error_message(e) except KeyError: - # FIXME: The current implementation relies on the barrier task to be - # known by its name. If the name has been mangled, we cannot guarantee - # that the shuffle works as intended and should fail instead. - self._raise_if_barrier_unknown(spec.id) - self._raise_if_task_not_processing(key) - worker_for = self._calculate_worker_for(spec) - self._ensure_output_tasks_are_non_rootish(spec) - state = spec.create_new_run( - worker_for=worker_for, span_id=self.scheduler.tasks[key].group.span_id - ) - self.active_shuffles[spec.id] = state - self._shuffles[spec.id].add(state) - state.participating_workers.add(worker) - logger.warning( - "Shuffle %s initialized by task %r executed on worker %s", - spec.id, - key, - worker, - ) - return ToPickle(state.run_spec) + try: + run_spec = self._create(spec, key, worker) + except P2PConsistencyError as e: + return error_message(e) + return {"status": "OK", "run_spec": ToPickle(run_spec)} def _raise_if_barrier_unknown(self, id: ShuffleId) -> None: key = barrier_key(id) try: self.scheduler.tasks[key] except KeyError: - raise RuntimeError( + raise P2PConsistencyError( f"Barrier task with key {key!r} does not exist. This may be caused by " "task fusion during graph generation. Please let us know that you ran " "into this by leaving a comment at distributed#7816." @@ -177,7 +200,9 @@ def _raise_if_barrier_unknown(self, id: ShuffleId) -> None: def _raise_if_task_not_processing(self, key: Key) -> None: task = self.scheduler.tasks[key] if task.state != "processing": - raise RuntimeError(f"Expected {task} to be processing, is {task.state}.") + raise P2PConsistencyError( + f"Expected {task} to be processing, is {task.state}." + ) def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: """Pin the outputs of a P2P shuffle to specific workers. @@ -235,7 +260,7 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: if existing: # pragma: nocover for shared_key in existing.keys() & current_worker_for.keys(): if existing[shared_key] != current_worker_for[shared_key]: - raise RuntimeError( + raise P2PIllegalStateError( f"Failed to initialize shuffle {spec.id} because " "it cannot align output partition mappings between " f"existing shuffles {seen}. " @@ -316,7 +341,7 @@ def _restart_recommendations(self, id: ShuffleId) -> Recs: if barrier_task.state == "erred": # This should never happen, a dependent of the barrier should already # be `erred` - raise RuntimeError( + raise P2PIllegalStateError( f"Expected dependents of {barrier_task=} to be 'erred' if " "the barrier is." ) # pragma: no cover @@ -326,7 +351,7 @@ def _restart_recommendations(self, id: ShuffleId) -> Recs: if dt.state == "erred": # This should never happen, a dependent of the barrier should already # be `erred` - raise RuntimeError( + raise P2PIllegalStateError( f"Expected barrier and its dependents to be " f"'erred' if the barrier's dependency {dt} is." ) # pragma: no cover @@ -366,7 +391,9 @@ def remove_worker( shuffle_id, stimulus_id, ) - exception = RuntimeError(f"Worker {worker} left during active {shuffle}") + exception = P2PConsistencyError( + f"Worker {worker} left during active {shuffle}" + ) self._fail_on_workers(shuffle, str(exception)) self._clean_on_scheduler(shuffle_id, stimulus_id) diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index 55414021ff..57d2cfe369 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -10,6 +10,7 @@ from dask.typing import Key from dask.utils import parse_bytes +from distributed.core import ErrorMessage, OKMessage, clean_exception, error_message from distributed.diagnostics.plugin import WorkerPlugin from distributed.protocol.serialize import ToPickle from distributed.shuffle._core import ( @@ -19,7 +20,7 @@ ShuffleRunSpec, ShuffleSpec, ) -from distributed.shuffle._exceptions import ShuffleClosedError +from distributed.shuffle._exceptions import P2PConsistencyError, ShuffleClosedError from distributed.shuffle._limiter import ResourceLimiter from distributed.utils import log_errors, sync @@ -67,7 +68,7 @@ def fail(self, shuffle_id: ShuffleId, run_id: int, message: str) -> None: if shuffle_run is None or shuffle_run.run_id != run_id: return self._active_runs.pop(shuffle_id) - exception = RuntimeError(message) + exception = P2PConsistencyError(message) shuffle_run.fail(exception) self._plugin.worker._ongoing_background_tasks.call_soon(self.close, shuffle_run) @@ -110,17 +111,19 @@ async def get_with_run_id(self, shuffle_id: ShuffleId, run_id: int) -> ShuffleRu ------ KeyError If the shuffle does not exist - RuntimeError + P2PConsistencyError If the run_id is stale + ShuffleClosedError + If the run manager has been closed """ shuffle_run = self._active_runs.get(shuffle_id, None) if shuffle_run is None or shuffle_run.run_id < run_id: shuffle_run = await self._refresh(shuffle_id=shuffle_id) if shuffle_run.run_id > run_id: - raise RuntimeError(f"{run_id=} stale, got {shuffle_run}") + raise P2PConsistencyError(f"{run_id=} stale, got {shuffle_run}") elif shuffle_run.run_id < run_id: - raise RuntimeError(f"{run_id=} invalid, got {shuffle_run}") + raise P2PConsistencyError(f"{run_id=} invalid, got {shuffle_run}") if self.closed: raise ShuffleClosedError(f"{self} has already been closed") @@ -172,7 +175,7 @@ async def get_most_recent( ------ KeyError If the shuffle does not exist - RuntimeError + P2PConsistencyError If the most recent run_id is stale """ return await self.get_with_run_id(shuffle_id=shuffle_id, run_id=max(run_ids)) @@ -183,22 +186,25 @@ async def _fetch( spec: ShuffleSpec | None = None, key: Key | None = None, ) -> ShuffleRunSpec: - # FIXME: This should never be ToPickle[ShuffleRunSpec] - result: ShuffleRunSpec | ToPickle[ShuffleRunSpec] if spec is None: - result = await self._plugin.worker.scheduler.shuffle_get( + response = await self._plugin.worker.scheduler.shuffle_get( id=shuffle_id, worker=self._plugin.worker.address, ) else: - result = await self._plugin.worker.scheduler.shuffle_get_or_create( + response = await self._plugin.worker.scheduler.shuffle_get_or_create( spec=ToPickle(spec), key=key, worker=self._plugin.worker.address, ) - if isinstance(result, ToPickle): - result = result.data - return result + + status = response["status"] + if status == "error": + _, exc, tb = clean_exception(**response) + assert exc + raise exc.with_traceback(tb) + assert status == "OK" + return response["run_spec"] @overload async def _refresh( @@ -236,7 +242,7 @@ async def _refresh( ) stale_run_id = self._stale_run_ids.get(shuffle_id, None) if stale_run_id is not None and stale_run_id >= result.run_id: - raise RuntimeError( + raise P2PConsistencyError( f"Received stale shuffle run with run_id={result.run_id};" f" expected run_id > {stale_run_id}" ) @@ -306,15 +312,17 @@ async def shuffle_receive( shuffle_id: ShuffleId, run_id: int, data: list[tuple[int, Any]] | bytes, - ) -> None: + ) -> OKMessage | ErrorMessage: """ Handler: Receive an incoming shard of data from a peer worker. Using an unknown ``shuffle_id`` is an error. """ - shuffle_run = await self._get_shuffle_run(shuffle_id, run_id) - await shuffle_run.receive(data) + try: + shuffle_run = await self._get_shuffle_run(shuffle_id, run_id) + return await shuffle_run.receive(data) + except P2PConsistencyError as e: + return error_message(e) - @log_errors async def shuffle_inputs_done(self, shuffle_id: ShuffleId, run_id: int) -> None: """ Handler: Inform the extension that all input partitions have been handed off to extensions. diff --git a/distributed/shuffle/tests/test_comm_buffer.py b/distributed/shuffle/tests/test_comm_buffer.py index 36896c547d..8cf58651af 100644 --- a/distributed/shuffle/tests/test_comm_buffer.py +++ b/distributed/shuffle/tests/test_comm_buffer.py @@ -62,6 +62,7 @@ async def send(address, shards): await block_send.wait() d[address].extend(shards) sending_first.set() + return {"status": "OK"} mc = CommShardsBuffer( send=send, concurrency_limit=1, memory_limiter=ResourceLimiter(None) @@ -131,6 +132,7 @@ async def send(address, shards): if counter == 5: raise OSError("error during send") d[address].extend(shards) + return {"status": "OK"} frac = 0.1 nshards = 10 diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 663b559eaf..3b6b75da4e 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -41,7 +41,7 @@ Scheduler, Worker, ) -from distributed.core import ConnectionPool +from distributed.core import ConnectionPool, ErrorMessage, OKMessage from distributed.scheduler import TaskState as SchedulerTaskState from distributed.shuffle._arrow import ( buffers_to_table, @@ -49,6 +49,7 @@ read_from_disk, serialize_table, ) +from distributed.shuffle._exceptions import P2PConsistencyError from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin from distributed.shuffle._shuffle import ( @@ -1919,7 +1920,7 @@ async def test_error_send(tmp_path, loop_in_thread): partitions_for_worker[w].append(part) class ErrorSend(DataFrameShuffleRun): - async def send(self, *args: Any, **kwargs: Any) -> None: + async def send(self, *args: Any, **kwargs: Any) -> OKMessage | ErrorMessage: raise RuntimeError("Error during send") with DataFrameShuffleTestPool() as local_shuffle_pool: @@ -1951,8 +1952,9 @@ async def send(self, *args: Any, **kwargs: Any) -> None: @pytest.mark.skipif(not pa, reason="Requires PyArrow") +@pytest.mark.parametrize("Error", [P2PConsistencyError, ValueError]) @gen_test() -async def test_error_receive(tmp_path, loop_in_thread): +async def test_error_receive(tmp_path, loop_in_thread, Error): dfs = [] rows_per_df = 10 n_input_partitions = 1 @@ -1976,7 +1978,7 @@ async def test_error_receive(tmp_path, loop_in_thread): class ErrorReceive(DataFrameShuffleRun): async def _receive(self, data: list[tuple[int, bytes]]) -> None: - raise RuntimeError("Error during receive") + raise Error("Error during receive") with DataFrameShuffleTestPool() as local_shuffle_pool: sA = local_shuffle_pool.new_shuffle( @@ -2000,7 +2002,7 @@ async def _receive(self, data: list[tuple[int, bytes]]) -> None: ) try: sB.add_partition(dfs[0], 0) - with pytest.raises(RuntimeError, match="Error during receive"): + with pytest.raises(Error, match="Error during receive"): await sB.barrier(run_ids=[sB.run_id]) finally: await asyncio.gather(*[s.close() for s in [sA, sB]]) @@ -2012,7 +2014,9 @@ def setup(self, worker: Worker) -> None: self.in_shuffle_receive = asyncio.Event() self.block_shuffle_receive = asyncio.Event() - async def shuffle_receive(self, *args: Any, **kwargs: Any) -> None: + async def shuffle_receive( + self, *args: Any, **kwargs: Any + ) -> OKMessage | ErrorMessage: self.in_shuffle_receive.set() await self.block_shuffle_receive.wait() return await super().shuffle_receive(*args, **kwargs) @@ -2144,7 +2148,7 @@ async def test_shuffle_run_consistency(c, s, a): out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) - spec = scheduler_ext.get(shuffle_id, a.worker_address).data + spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data # Shuffle run manager can fetch the current run assert await run_manager.get_with_run_id(shuffle_id, spec.run_id) @@ -2170,7 +2174,7 @@ async def test_shuffle_run_consistency(c, s, a): new_shuffle_id = await wait_until_new_shuffle_is_initialized(s) assert shuffle_id == new_shuffle_id - new_spec = scheduler_ext.get(shuffle_id, a.worker_address).data + new_spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data # Check invariant that the new run ID is larger than the previous assert spec.run_id < new_spec.run_id @@ -2196,7 +2200,9 @@ async def test_shuffle_run_consistency(c, s, a): independent_shuffle_id = await wait_until_new_shuffle_is_initialized(s) assert shuffle_id != independent_shuffle_id - independent_spec = scheduler_ext.get(independent_shuffle_id, a.worker_address).data + independent_spec = scheduler_ext.get(independent_shuffle_id, a.worker_address)[ + "run_spec" + ].data # Check invariant that the new run ID is larger than the previous # for independent shuffles @@ -2236,7 +2242,7 @@ async def test_fail_fetch_race(c, s, a): out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) - spec = scheduler_ext.get(shuffle_id, a.worker_address).data + spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data await worker_plugin.in_barrier.wait() # Pretend that the fail from the scheduler arrives first run_manager.fail(shuffle_id, spec.run_id, "error") diff --git a/distributed/shuffle/tests/utils.py b/distributed/shuffle/tests/utils.py index 1d4fb7319a..693a044373 100644 --- a/distributed/shuffle/tests/utils.py +++ b/distributed/shuffle/tests/utils.py @@ -32,7 +32,7 @@ async def _(**kwargs): # here. kwargs = _nested_deserialize(kwargs) meth = getattr(self.shuffle, method_name) - return await meth(**kwargs) + return _nested_deserialize(await meth(**kwargs)) return _