diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 96e9f154a..ddaaf9a54 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -28,7 +28,8 @@ from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( _AsyncAbstractObjectStream, ) -from google.cloud.storage._experimental.asyncio.bidi_async import AsyncBidiRpc + +from google.api_core.bidi_async import AsyncBidiRpc class _AsyncReadObjectStream(_AsyncAbstractObjectStream): diff --git a/google/cloud/storage/_experimental/asyncio/bidi_async.py b/google/cloud/storage/_experimental/asyncio/bidi_async.py deleted file mode 100644 index 8c5e58fd0..000000000 --- a/google/cloud/storage/_experimental/asyncio/bidi_async.py +++ /dev/null @@ -1,230 +0,0 @@ -# Copyright 2025, Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Asynchronous bi-directional streaming RPC helpers.""" - -import asyncio -import logging - -from google.api_core import exceptions -from google.cloud.storage._experimental.asyncio.bidi_base import BidiRpcBase - -_LOGGER = logging.getLogger(__name__) - - -class _AsyncRequestQueueGenerator: - """_AsyncRequestQueueGenerator is a helper class for sending asynchronous - requests to a gRPC stream from a Queue. - - This generator takes asynchronous requests off a given queue and yields them - to gRPC. - - This helper is useful when you have an indeterminate, indefinite, or - otherwise open-ended set of requests to send through a request-streaming - (or bidirectional) RPC. - - The reason this is necessary - - is because it's let's user have control on the when they would want to - send requests proto messages instead of sending all of them initilally. - - This is achieved via asynchronous queue (asyncio.Queue), - gRPC awaits until there's a message in the queue. - - Finally, it allows for retrying without swapping queues because if it does - pull an item off the queue when the RPC is inactive, it'll immediately put - it back and then exit. This is necessary because yielding the item in this - case will cause gRPC to discard it. In practice, this means that the order - of messages is not guaranteed. If such a thing is necessary it would be - easy to use a priority queue. - - Example:: - - requests = _AsyncRequestQueueGenerator(q) - call = await stub.StreamingRequest(requests) - requests.call = call - - async for response in call: - print(response) - await q.put(...) - - Args: - queue (asyncio.Queue): The request queue. - initial_request (Union[protobuf.Message, - Callable[[], protobuf.Message]]): The initial request to - yield. This is done independently of the request queue to allow for - easily restarting streams that require some initial configuration - request. - """ - - def __init__(self, queue: asyncio.Queue, initial_request=None): - self._queue = queue - self._initial_request = initial_request - self.call = None - - def _is_active(self): - """ - Returns true if the call is not set or not completed. - """ - return self.call is None or not self.call.done() - - async def __aiter__(self): - if self._initial_request is not None: - if callable(self._initial_request): - yield self._initial_request() - else: - yield self._initial_request - - while True: - item = await self._queue.get() - - # The consumer explicitly sent "None", indicating that the request - # should end. - if item is None: - _LOGGER.debug("Cleanly exiting request generator.") - return - - if not self._is_active(): - # We have an item, but the call is closed. We should put the - # item back on the queue so that the next call can consume it. - await self._queue.put(item) - _LOGGER.debug( - "Inactive call, replacing item on queue and exiting " - "request generator." - ) - return - - yield item - - -class AsyncBidiRpc(BidiRpcBase): - """A helper for consuming a async bi-directional streaming RPC. - - This maps gRPC's built-in interface which uses a request iterator and a - response iterator into a socket-like :func:`send` and :func:`recv`. This - is a more useful pattern for long-running or asymmetric streams (streams - where there is not a direct correlation between the requests and - responses). - - Example:: - - initial_request = example_pb2.StreamingRpcRequest( - setting='example') - rpc = AsyncBidiRpc( - stub.StreamingRpc, - initial_request=initial_request, - metadata=[('name', 'value')] - ) - - await rpc.open() - - while rpc.is_active: - print(await rpc.recv()) - await rpc.send(example_pb2.StreamingRpcRequest( - data='example')) - - This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. - - Args: - start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to - start the RPC. - initial_request (Union[protobuf.Message, - Callable[[], protobuf.Message]]): The initial request to - yield. This is useful if an initial request is needed to start the - stream. - metadata (Sequence[Tuple(str, str)]): RPC metadata to include in - the request. - """ - - def _create_queue(self): - """Create a queue for requests.""" - return asyncio.Queue() - - async def open(self): - """Opens the stream.""" - if self.is_active: - raise ValueError("Can not open an already open stream.") - - request_generator = _AsyncRequestQueueGenerator( - self._request_queue, initial_request=self._initial_request - ) - try: - call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) - except exceptions.GoogleAPICallError as exc: - # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is - # available from the ``response`` property on the mapped exception. - self._on_call_done(exc.response) - raise - - request_generator.call = call - - # TODO: api_core should expose the future interface for wrapped - # callables as well. - if hasattr(call, "_wrapped"): # pragma: NO COVER - call._wrapped.add_done_callback(self._on_call_done) - else: - call.add_done_callback(self._on_call_done) - - self._request_generator = request_generator - self.call = call - - async def close(self): - """Closes the stream.""" - if self.call is None: - return - - await self._request_queue.put(None) - self.call.cancel() - self._request_generator = None - self._initial_request = None - self._callbacks = [] - # Don't set self.call to None. Keep it around so that send/recv can - # raise the error. - - async def send(self, request): - """Queue a message to be sent on the stream. - - If the underlying RPC has been closed, this will raise. - - Args: - request (protobuf.Message): The request to send. - """ - if self.call is None: - raise ValueError("Can not send() on an RPC that has never been opened.") - - # Don't use self.is_active(), as ResumableBidiRpc will overload it - # to mean something semantically different. - if not self.call.done(): - await self._request_queue.put(request) - else: - # calling read should cause the call to raise. - await self.call.read() - - async def recv(self): - """Wait for a message to be returned from the stream. - - If the underlying RPC has been closed, this will raise. - - Returns: - protobuf.Message: The received message. - """ - if self.call is None: - raise ValueError("Can not recv() on an RPC that has never been opened.") - - return await self.call.read() - - @property - def is_active(self): - """bool: True if this stream is currently open and active.""" - return self.call is not None and not self.call.done() diff --git a/google/cloud/storage/_experimental/asyncio/bidi_base.py b/google/cloud/storage/_experimental/asyncio/bidi_base.py deleted file mode 100644 index 195e35750..000000000 --- a/google/cloud/storage/_experimental/asyncio/bidi_base.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2025, Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# You may obtain a copy of the License at -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Base class for bi-directional streaming RPC helpers.""" - - -class BidiRpcBase: - """A base class for consuming a bi-directional streaming RPC. - - This maps gRPC's built-in interface which uses a request iterator and a - response iterator into a socket-like :func:`send` and :func:`recv`. This - is a more useful pattern for long-running or asymmetric streams (streams - where there is not a direct correlation between the requests and - responses). - - This does *not* retry the stream on errors. - - Args: - start_rpc (Union[grpc.StreamStreamMultiCallable, - grpc.aio.StreamStreamMultiCallable]): The gRPC method used - to start the RPC. - initial_request (Union[protobuf.Message, - Callable[[], protobuf.Message]]): The initial request to - yield. This is useful if an initial request is needed to start the - stream. - metadata (Sequence[Tuple(str, str)]): RPC metadata to include in - the request. - """ - - def __init__(self, start_rpc, initial_request=None, metadata=None): - self._start_rpc = start_rpc - self._initial_request = initial_request - self._rpc_metadata = metadata - self._request_queue = self._create_queue() - self._request_generator = None - self._callbacks = [] - self.call = None - - def _create_queue(self): - """Create a queue for requests.""" - raise NotImplementedError("`_create_queue` is not implemented.") - - def add_done_callback(self, callback): - """Adds a callback that will be called when the RPC terminates. - - This occurs when the RPC errors or is successfully terminated. - - Args: - callback (Callable[[grpc.Future], None]): The callback to execute. - It will be provided with the same gRPC future as the underlying - stream which will also be a :class:`grpc.aio.Call`. - """ - self._callbacks.append(callback) - - def _on_call_done(self, future): - # This occurs when the RPC errors or is successfully terminated. - # Note that grpc's "future" here can also be a grpc.RpcError. - # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 - # that `grpc.RpcError` is also `grpc.aio.Call`. - for callback in self._callbacks: - callback(future) - - @property - def is_active(self): - """bool: True if this stream is currently open and active.""" - raise NotImplementedError("`is_active` is not implemented.") - - @property - def pending_requests(self): - """int: Returns an estimate of the number of queued requests.""" - return self._request_queue.qsize() diff --git a/setup.py b/setup.py index 43e3404f6..2c4504749 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ release_status = "Development Status :: 5 - Production/Stable" dependencies = [ "google-auth >= 2.26.1, < 3.0.0", - "google-api-core >= 2.15.0, < 3.0.0", + "google-api-core >= 2.27.0, < 3.0.0", "google-cloud-core >= 2.4.2, < 3.0.0", # The dependency "google-resumable-media" is no longer used. However, the # dependency is still included here to accommodate users who may be diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 251ae699b..ccf6c1493 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,7 +5,7 @@ # e.g., if setup.py has "google-cloud-foo >= 1.14.0, < 2.0.0", # Then this file should have google-cloud-foo==1.14.0 google-auth==2.26.1 -google-api-core==2.15.0 +google-api-core>=2.15.0 google-cloud-core==2.4.2 google-resumable-media==2.7.2 requests==2.22.0 diff --git a/tests/unit/asyncio/test_bidi_async.py b/tests/unit/asyncio/test_bidi_async.py index 313748af8..9625c806e 100644 --- a/tests/unit/asyncio/test_bidi_async.py +++ b/tests/unit/asyncio/test_bidi_async.py @@ -30,7 +30,7 @@ except ImportError: # pragma: NO COVER pytest.skip("No GRPC", allow_module_level=True) -from google.cloud.storage._experimental.asyncio import bidi_async +from google.api_core import bidi_async from google.api_core import exceptions