From e2ab53b4e48055ee0b7a4835d33dd90eae4bd7cd Mon Sep 17 00:00:00 2001 From: zhixiangli Date: Mon, 20 Apr 2026 08:21:10 +0000 Subject: [PATCH] perf(storage): implement fast-path for queue delivery in _StreamMultiplexer --- .../storage/asyncio/_stream_multiplexer.py | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py index 3bcac2ab9cfb..6251266b7240 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py @@ -88,6 +88,11 @@ def _get_unique_queues(self) -> Set[asyncio.Queue]: return set(self._queues.values()) async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None: + """Slow-path put: wait up to _DEFAULT_PUT_TIMEOUT_SECONDS, else drop. + + Callers should attempt ``queue.put_nowait(item)`` first and only call + this when it raises :class:`asyncio.QueueFull`. + """ try: await asyncio.wait_for( queue.put(item), timeout=_DEFAULT_PUT_TIMEOUT_SECONDS @@ -100,6 +105,32 @@ async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None: "Queue full for too long. Dropping item to prevent multiplexer hang." ) + async def _put_to_queues(self, queues, item) -> None: + """Deliver ``item`` to each queue. + + Fast path: ``put_nowait`` for queues with capacity (no Task, no + timer handle, no coroutine yield). Slow path: ``_put_with_timeout`` + only for queues that were full, and a single direct ``await`` when + exactly one queue needs the slow path (skips ``asyncio.gather``). + """ + slow_queues = None + for q in queues: + try: + q.put_nowait(item) + except asyncio.QueueFull: + if slow_queues is None: + slow_queues = [q] + else: + slow_queues.append(q) + if slow_queues is None: + return + if len(slow_queues) == 1: + await self._put_with_timeout(slow_queues[0], item) + else: + await asyncio.gather( + *(self._put_with_timeout(q, item) for q in slow_queues) + ) + def _ensure_recv_loop(self) -> None: if self._recv_task is None or self._recv_task.done(): self._recv_task = asyncio.create_task(self._recv_loop()) @@ -124,13 +155,7 @@ async def _recv_loop(self) -> None: while True: response = await self._stream.recv() if response == grpc.aio.EOF: - sentinel = _StreamEnd() - await asyncio.gather( - *( - self._put_with_timeout(queue, sentinel) - for queue in self._get_unique_queues() - ) - ) + await self._put_to_queues(self._get_unique_queues(), _StreamEnd()) return if response.object_data_ranges: @@ -144,19 +169,9 @@ async def _recv_loop(self) -> None: logger.warning( f"Received data for unregistered read_id: {read_id}" ) - await asyncio.gather( - *( - self._put_with_timeout(queue, response) - for queue in queues_to_notify - ) - ) + await self._put_to_queues(queues_to_notify, response) else: - await asyncio.gather( - *( - self._put_with_timeout(queue, response) - for queue in self._get_unique_queues() - ) - ) + await self._put_to_queues(self._get_unique_queues(), response) except asyncio.CancelledError: raise except Exception as e: