From a94b3e7d642834e02802630b5e3b997ba15141a6 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 16 Sep 2025 17:00:04 +0000 Subject: [PATCH 1/3] chore: move async files into async folder --- .../_experimental/{ => asyncio}/async_grpc_client.py | 0 tests/unit/{ => asyncio}/test_async_grpc_client.py | 6 +++--- 2 files changed, 3 insertions(+), 3 deletions(-) rename google/cloud/storage/_experimental/{ => asyncio}/async_grpc_client.py (100%) rename tests/unit/{ => asyncio}/test_async_grpc_client.py (92%) diff --git a/google/cloud/storage/_experimental/async_grpc_client.py b/google/cloud/storage/_experimental/asyncio/async_grpc_client.py similarity index 100% rename from google/cloud/storage/_experimental/async_grpc_client.py rename to google/cloud/storage/_experimental/asyncio/async_grpc_client.py diff --git a/tests/unit/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py similarity index 92% rename from tests/unit/test_async_grpc_client.py rename to tests/unit/asyncio/test_async_grpc_client.py index 322772f8d..250d85522 100644 --- a/tests/unit/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -26,7 +26,7 @@ def _make_credentials(spec=None): class TestAsyncGrpcClient(unittest.TestCase): @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_constructor_default_options(self, mock_async_storage_client): - from google.cloud.storage._experimental import async_grpc_client + from google.cloud.storage._experimental.asyncio import async_grpc_client mock_transport_cls = mock.MagicMock() mock_async_storage_client.get_transport_class.return_value = mock_transport_cls @@ -54,7 +54,7 @@ def test_constructor_default_options(self, mock_async_storage_client): @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_constructor_disables_directpath(self, mock_async_storage_client): - from google.cloud.storage._experimental import async_grpc_client + from google.cloud.storage._experimental.asyncio import async_grpc_client mock_transport_cls = mock.MagicMock() mock_async_storage_client.get_transport_class.return_value = mock_transport_cls @@ -74,7 +74,7 @@ def test_constructor_disables_directpath(self, mock_async_storage_client): @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_grpc_client_property(self, mock_async_storage_client): - from google.cloud.storage._experimental import async_grpc_client + from google.cloud.storage._experimental.asyncio import async_grpc_client mock_creds = _make_credentials() From 32168a6895bccdaf8b29aa4ee6e51e8e929f51ab Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 04:42:07 +0000 Subject: [PATCH 2/3] Add async bidiRpc files in python-storage these files will be removed once https://github.com/googleapis/python-api-core/pull/836 gets submitted --- .../_experimental/asyncio/bidi_async.py | 230 +++++++++++++ .../_experimental/asyncio/bidi_base.py | 80 +++++ tests/unit/asyncio/test_bidi_async.py | 310 ++++++++++++++++++ 3 files changed, 620 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/bidi_async.py create mode 100644 google/cloud/storage/_experimental/asyncio/bidi_base.py create mode 100644 tests/unit/asyncio/test_bidi_async.py diff --git a/google/cloud/storage/_experimental/asyncio/bidi_async.py b/google/cloud/storage/_experimental/asyncio/bidi_async.py new file mode 100644 index 000000000..f9b80599a --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/bidi_async.py @@ -0,0 +1,230 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Asynchronous bi-directional streaming RPC helpers.""" + +import asyncio +import logging + +from google.api_core import exceptions +from bidi_base import BidiRpcBase + +_LOGGER = logging.getLogger(__name__) + + +class _AsyncRequestQueueGenerator: + """_AsyncRequestQueueGenerator is a helper class for sending asynchronous + requests to a gRPC stream from a Queue. + + This generator takes asynchronous requests off a given queue and yields them + to gRPC. + + This helper is useful when you have an indeterminate, indefinite, or + otherwise open-ended set of requests to send through a request-streaming + (or bidirectional) RPC. + + The reason this is necessary + + is because it's let's user have control on the when they would want to + send requests proto messages instead of sending all of them initilally. + + This is achieved via asynchronous queue (asyncio.Queue), + gRPC awaits until there's a message in the queue. + + Finally, it allows for retrying without swapping queues because if it does + pull an item off the queue when the RPC is inactive, it'll immediately put + it back and then exit. This is necessary because yielding the item in this + case will cause gRPC to discard it. In practice, this means that the order + of messages is not guaranteed. If such a thing is necessary it would be + easy to use a priority queue. + + Example:: + + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call + + async for response in call: + print(response) + await q.put(...) + + Args: + queue (asyncio.Queue): The request queue. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. + """ + + def __init__(self, queue: asyncio.Queue, initial_request=None): + self._queue = queue + self._initial_request = initial_request + self.call = None + + def _is_active(self): + """ + Returns true if the call is not set or not completed. + """ + return self.call is None or not self.call.done() + + async def __aiter__(self): + if self._initial_request is not None: + if callable(self._initial_request): + yield self._initial_request() + else: + yield self._initial_request + + while True: + item = await self._queue.get() + + # The consumer explicitly sent "None", indicating that the request + # should end. + if item is None: + _LOGGER.debug("Cleanly exiting request generator.") + return + + if not self._is_active(): + # We have an item, but the call is closed. We should put the + # item back on the queue so that the next call can consume it. + await self._queue.put(item) + _LOGGER.debug( + "Inactive call, replacing item on queue and exiting " + "request generator." + ) + return + + yield item + + +class AsyncBidiRpc(BidiRpcBase): + """A helper for consuming a async bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + Example:: + + initial_request = example_pb2.StreamingRpcRequest( + setting='example') + rpc = AsyncBidiRpc( + stub.StreamingRpc, + initial_request=initial_request, + metadata=[('name', 'value')] + ) + + await rpc.open() + + while rpc.is_active: + print(await rpc.recv()) + await rpc.send(example_pb2.StreamingRpcRequest( + data='example')) + + This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. + + Args: + start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to + start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def _create_queue(self): + """Create a queue for requests.""" + return asyncio.Queue() + + async def open(self): + """Opens the stream.""" + if self.is_active: + raise ValueError("Can not open an already open stream.") + + request_generator = _AsyncRequestQueueGenerator( + self._request_queue, initial_request=self._initial_request + ) + try: + call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) + except exceptions.GoogleAPICallError as exc: + # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is + # available from the ``response`` property on the mapped exception. + self._on_call_done(exc.response) + raise + + request_generator.call = call + + # TODO: api_core should expose the future interface for wrapped + # callables as well. + if hasattr(call, "_wrapped"): # pragma: NO COVER + call._wrapped.add_done_callback(self._on_call_done) + else: + call.add_done_callback(self._on_call_done) + + self._request_generator = request_generator + self.call = call + + async def close(self): + """Closes the stream.""" + if self.call is None: + return + + await self._request_queue.put(None) + self.call.cancel() + self._request_generator = None + self._initial_request = None + self._callbacks = [] + # Don't set self.call to None. Keep it around so that send/recv can + # raise the error. + + async def send(self, request): + """Queue a message to be sent on the stream. + + If the underlying RPC has been closed, this will raise. + + Args: + request (protobuf.Message): The request to send. + """ + if self.call is None: + raise ValueError("Can not send() on an RPC that has never been opened.") + + # Don't use self.is_active(), as ResumableBidiRpc will overload it + # to mean something semantically different. + if not self.call.done(): + await self._request_queue.put(request) + else: + # calling read should cause the call to raise. + await self.call.read() + + async def recv(self): + """Wait for a message to be returned from the stream. + + If the underlying RPC has been closed, this will raise. + + Returns: + protobuf.Message: The received message. + """ + if self.call is None: + raise ValueError("Can not recv() on an RPC that has never been opened.") + + return await self.call.read() + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + return self.call is not None and not self.call.done() diff --git a/google/cloud/storage/_experimental/asyncio/bidi_base.py b/google/cloud/storage/_experimental/asyncio/bidi_base.py new file mode 100644 index 000000000..195e35750 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/bidi_base.py @@ -0,0 +1,80 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for bi-directional streaming RPC helpers.""" + + +class BidiRpcBase: + """A base class for consuming a bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + This does *not* retry the stream on errors. + + Args: + start_rpc (Union[grpc.StreamStreamMultiCallable, + grpc.aio.StreamStreamMultiCallable]): The gRPC method used + to start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def __init__(self, start_rpc, initial_request=None, metadata=None): + self._start_rpc = start_rpc + self._initial_request = initial_request + self._rpc_metadata = metadata + self._request_queue = self._create_queue() + self._request_generator = None + self._callbacks = [] + self.call = None + + def _create_queue(self): + """Create a queue for requests.""" + raise NotImplementedError("`_create_queue` is not implemented.") + + def add_done_callback(self, callback): + """Adds a callback that will be called when the RPC terminates. + + This occurs when the RPC errors or is successfully terminated. + + Args: + callback (Callable[[grpc.Future], None]): The callback to execute. + It will be provided with the same gRPC future as the underlying + stream which will also be a :class:`grpc.aio.Call`. + """ + self._callbacks.append(callback) + + def _on_call_done(self, future): + # This occurs when the RPC errors or is successfully terminated. + # Note that grpc's "future" here can also be a grpc.RpcError. + # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 + # that `grpc.RpcError` is also `grpc.aio.Call`. + for callback in self._callbacks: + callback(future) + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + raise NotImplementedError("`is_active` is not implemented.") + + @property + def pending_requests(self): + """int: Returns an estimate of the number of queued requests.""" + return self._request_queue.qsize() diff --git a/tests/unit/asyncio/test_bidi_async.py b/tests/unit/asyncio/test_bidi_async.py new file mode 100644 index 000000000..313748af8 --- /dev/null +++ b/tests/unit/asyncio/test_bidi_async.py @@ -0,0 +1,310 @@ +# Copyright 2025, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import asyncio + +from unittest import mock + +try: + from unittest.mock import AsyncMock +except ImportError: # pragma: NO COVER + from mock import AsyncMock # type: ignore + + +import pytest + +try: + from grpc import aio +except ImportError: # pragma: NO COVER + pytest.skip("No GRPC", allow_module_level=True) + +from google.cloud.storage._experimental.asyncio import bidi_async +from google.api_core import exceptions + + +if sys.version_info < (3, 10): # type: ignore[operator] + + def aiter(obj): + return obj.__aiter__() + + async def anext(obj): + return await obj.__anext__() + + +@pytest.mark.asyncio +class Test_AsyncRequestQueueGenerator: + async def test_bounded_consume(self): + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = False + + q = asyncio.Queue() + await q.put(mock.sentinel.A) + await q.put(mock.sentinel.B) + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + items = [] + gen_aiter = aiter(generator) + + items.append(await anext(gen_aiter)) + items.append(await anext(gen_aiter)) + + # At this point, the queue is empty. The next call to anext will sleep. + # We make the call inactive. + call.done.return_value = True + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(anext(gen_aiter), timeout=1) + + assert items == [mock.sentinel.A, mock.sentinel.B] + + async def test_yield_initial_and_exit(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator( + q, initial_request=mock.sentinel.A + ) + generator.call = call + + assert await anext(aiter(generator)) == mock.sentinel.A + + async def test_yield_initial_callable_and_exit(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator( + q, initial_request=lambda: mock.sentinel.A + ) + generator.call = call + + assert await anext(aiter(generator)) == mock.sentinel.A + + async def test_exit_when_inactive_with_item(self): + q = asyncio.Queue() + await q.put(mock.sentinel.A) + + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + with pytest.raises(StopAsyncIteration) as exc_info: + assert await anext(aiter(generator)) + assert ( + exc_info.value.args[0] + == "Inactive call, replacing item on queue and exiting request generator." + ) + + # Make sure it put the item back. + assert not q.empty() + assert await q.get() == mock.sentinel.A + + async def test_exit_when_inactive_empty(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(anext(aiter(generator)), timeout=1) + + async def test_exit_with_stop(self): + q = asyncio.Queue() + await q.put(None) + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = False + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + with pytest.raises(StopAsyncIteration) as exc_info: + assert await anext(aiter(generator)) + assert exc_info.value.args[0] == "Cleanly exiting request generator." + + +def make_async_rpc(): + """Makes a mock async RPC used to test Bidi classes.""" + call = mock.create_autospec(aio.StreamStreamCall, instance=True) + rpc = AsyncMock() + + def rpc_side_effect(request, metadata=None): + call.done.return_value = False + return call + + rpc.side_effect = rpc_side_effect + + def cancel_side_effect(): + call.done.return_value = True + return True + + call.cancel.side_effect = cancel_side_effect + call.read = AsyncMock() + + return rpc, call + + +class AsyncClosedCall: + def __init__(self, exception): + self.exception = exception + + def done(self): + return True + + async def read(self): + raise self.exception + + +class TestAsyncBidiRpc: + def test_initial_state(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + assert bidi_rpc.is_active is False + + def test_done_callbacks(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + callback = mock.Mock(spec=["__call__"]) + + bidi_rpc.add_done_callback(callback) + bidi_rpc._on_call_done(mock.sentinel.future) + + callback.assert_called_once_with(mock.sentinel.future) + + @pytest.mark.asyncio + @pytest.mark.skipif( + sys.version_info < (3, 8), # type: ignore[operator] + reason="Python 3.8 below doesnt provide support for assert_awaited_once", + ) + async def test_metadata(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc, metadata=mock.sentinel.A) + assert bidi_rpc._rpc_metadata == mock.sentinel.A + + await bidi_rpc.open() + assert bidi_rpc.call == call + rpc.assert_awaited_once() + assert rpc.call_args.kwargs["metadata"] == mock.sentinel.A + + @pytest.mark.asyncio + async def test_open(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + + await bidi_rpc.open() + + assert bidi_rpc.call == call + assert bidi_rpc.is_active + call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done) + + @pytest.mark.asyncio + async def test_open_error_already_open(self): + rpc, _ = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + + await bidi_rpc.open() + + with pytest.raises(ValueError): + await bidi_rpc.open() + + @pytest.mark.asyncio + async def test_open_error_call_error(self): + rpc, _ = make_async_rpc() + expected_exception = exceptions.GoogleAPICallError( + "test", response=mock.sentinel.response + ) + rpc.side_effect = expected_exception + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + callback = mock.Mock(spec=["__call__"]) + bidi_rpc.add_done_callback(callback) + + with pytest.raises(exceptions.GoogleAPICallError) as exc_info: + await bidi_rpc.open() + + assert exc_info.value == expected_exception + callback.assert_called_once_with(mock.sentinel.response) + + @pytest.mark.asyncio + async def test_close(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + await bidi_rpc.open() + + await bidi_rpc.close() + + call.cancel.assert_called_once() + assert bidi_rpc.call is call + assert bidi_rpc.is_active is False + # ensure the request queue was signaled to stop. + assert bidi_rpc.pending_requests == 1 + assert await bidi_rpc._request_queue.get() is None + # ensure request and callbacks are cleaned up + assert bidi_rpc._initial_request is None + assert not bidi_rpc._callbacks + + @pytest.mark.asyncio + async def test_close_no_rpc(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + await bidi_rpc.close() + + @pytest.mark.asyncio + async def test_send(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + await bidi_rpc.open() + + await bidi_rpc.send(mock.sentinel.request) + + assert bidi_rpc.pending_requests == 1 + assert await bidi_rpc._request_queue.get() is mock.sentinel.request + + @pytest.mark.asyncio + async def test_send_not_open(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + + with pytest.raises(ValueError): + await bidi_rpc.send(mock.sentinel.request) + + @pytest.mark.asyncio + async def test_send_dead_rpc(self): + error = ValueError() + bidi_rpc = bidi_async.AsyncBidiRpc(None) + bidi_rpc.call = AsyncClosedCall(error) + + with pytest.raises(ValueError) as exc_info: + await bidi_rpc.send(mock.sentinel.request) + + assert exc_info.value == error + + @pytest.mark.asyncio + async def test_recv(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + bidi_rpc.call = mock.create_autospec(aio.Call, instance=True) + bidi_rpc.call.read = AsyncMock(return_value=mock.sentinel.response) + + response = await bidi_rpc.recv() + + assert response == mock.sentinel.response + + @pytest.mark.asyncio + async def test_recv_not_open(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + + with pytest.raises(ValueError): + await bidi_rpc.recv() From 8f547c01e69bc8cd3cc8eb5ac4f41211e94f201e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 07:08:07 +0000 Subject: [PATCH 3/3] fix import path for bidi_base --- google/cloud/storage/_experimental/asyncio/bidi_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/bidi_async.py b/google/cloud/storage/_experimental/asyncio/bidi_async.py index f9b80599a..8c5e58fd0 100644 --- a/google/cloud/storage/_experimental/asyncio/bidi_async.py +++ b/google/cloud/storage/_experimental/asyncio/bidi_async.py @@ -18,7 +18,7 @@ import logging from google.api_core import exceptions -from bidi_base import BidiRpcBase +from google.cloud.storage._experimental.asyncio.bidi_base import BidiRpcBase _LOGGER = logging.getLogger(__name__)