From a38c3965487da32903870b48a6dfc92eebc85a1f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 13 Nov 2025 18:47:05 +0000 Subject: [PATCH 1/4] feat(zb-experimental): add async write object stream --- .../asyncio/async_write_object_stream.py | 113 +++++++++++++++++ .../asyncio/test_async_write_object_stream.py | 117 ++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_write_object_stream.py create mode 100644 tests/unit/asyncio/test_async_write_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py new file mode 100644 index 000000000..ef19463e1 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -0,0 +1,113 @@ +# experimental poc - chandrasiri +import asyncio +from typing import Optional +from google.cloud import _storage_v2 +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) +from google.api_core.bidi_async import AsyncBidiRpc + + +class _AsyncWriteObjectStream(_AsyncAbstractObjectStream): + """Class representing a gRPC bidi-stream for writing data from a GCS + ``Appendable Object``. + + This class provides a unix socket-like interface to a GCS ``Object``, with + methods like ``open``, ``close``, ``send``, and ``recv``. + + :type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client` + :param client: async grpc client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the GCS ``bucket`` containing the object. + + :type object_name: str + :param object_name: The name of the GCS ``Appendable Object`` to be write. + + :type generation_number: int + :param generation_number: (Optional) If present, selects a specific revision of + this object. If None, a new object is created. + + :type write_handle: bytes + :param write_handle: (Optional) An existing handle for writing the object. + If provided, opening the bidi-gRPC connection will be faster. + """ + + def __init__( + self, + client: AsyncGrpcClient.grpc_client, + bucket_name: str, + object_name: str, + generation_number: Optional[int] = None, # None means new object + write_handle: Optional[bytes] = None, + ) -> None: + if client is None: + raise ValueError("client must be provided") + if bucket_name is None: + raise ValueError("bucket_name must be provided") + if object_name is None: + raise ValueError("object_name must be provided") + + super().__init__( + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation_number, + ) + self.client: AsyncGrpcClient.grpc_client = client + self.write_handle: Optional[bytes] = write_handle + + self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" + + self.rpc = self.client._client._transport._wrapped_methods[ + self.client._client._transport.bidi_write_object + ] + + self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),) + self.socket_like_rpc: Optional[AsyncBidiRpc] = None + self._is_stream_open: bool = False + self.first_bidi_write_req = None + self.persisted_size = 0 + self.object_resource: Optional[_storage_v2.Object] = None + + async def open(self) -> None: + """Opening an object for write , should do it's state lookup + to know what's the persisted size is. + """ + raise NotImplementedError( + "open() is not implemented yet in _AsyncWriteObjectStream" + ) + + async def close(self) -> None: + """Closes the bidi-gRPC connection.""" + raise NotImplementedError( + "close() is not implemented yet in _AsyncWriteObjectStream" + ) + + async def send( + self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest + ) -> None: + """Sends a request message on the stream. + + Args: + bidi_write_object_request (:class:`~google.cloud._storage_v2.types.BidiReadObjectRequest`): + The request message to send. This is typically used to specify + the read offset and limit. + """ + raise NotImplementedError( + "send() is not implemented yet in _AsyncWriteObjectStream" + ) + + async def recv(self) -> _storage_v2.BidiWriteObjectResponse: + """Receives a response from the stream. + + This method waits for the next message from the server, which could + contain object data or metadata. + + Returns: + :class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`: + The response message from the server. + """ + raise NotImplementedError( + "recv() is not implemented yet in _AsyncWriteObjectStream" + ) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py new file mode 100644 index 000000000..3e7c25d36 --- /dev/null +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -0,0 +1,117 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( + _AsyncWriteObjectStream, +) +from google.cloud import _storage_v2 + +BUCKET = "my-bucket" +OBJECT = "my-object" + + +@pytest.fixture +def mock_client(): + """Mock the async gRPC client.""" + mock_transport = mock.AsyncMock() + mock_transport.bidi_write_object = mock.sentinel.bidi_write_object + mock_transport._wrapped_methods = { + mock.sentinel.bidi_write_object: mock.sentinel.wrapped_bidi_write_object + } + + mock_gapic_client = mock.AsyncMock() + mock_gapic_client._transport = mock_transport + + client = mock.AsyncMock() + client._client = mock_gapic_client + return client + + +def test_async_write_object_stream_init(mock_client): + """Test the constructor of _AsyncWriteObjectStream.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + assert stream.client == mock_client + assert stream.bucket_name == BUCKET + assert stream.object_name == OBJECT + assert stream.generation_number is None + assert stream.write_handle is None + assert stream._full_bucket_name == f"projects/_/buckets/{BUCKET}" + assert stream.rpc == mock.sentinel.wrapped_bidi_write_object + assert stream.metadata == ( + ("x-goog-request-params", f"bucket=projects/_/buckets/{BUCKET}"), + ) + assert stream.socket_like_rpc is None + assert not stream._is_stream_open + assert stream.first_bidi_write_req is None + assert stream.persisted_size == 0 + assert stream.object_resource is None + + +def test_async_write_object_stream_init_with_generation_and_handle(mock_client): + """Test the constructor with optional arguments.""" + generation = 12345 + write_handle = b"test-handle" + stream = _AsyncWriteObjectStream( + mock_client, + BUCKET, + OBJECT, + generation_number=generation, + write_handle=write_handle, + ) + + assert stream.generation_number == generation + assert stream.write_handle == write_handle + + +def test_async_write_object_stream_init_raises_value_error(): + """Test that the constructor raises ValueError for missing arguments.""" + with pytest.raises(ValueError, match="client must be provided"): + _AsyncWriteObjectStream(None, BUCKET, OBJECT) + + with pytest.raises(ValueError, match="bucket_name must be provided"): + _AsyncWriteObjectStream(mock.Mock(), None, OBJECT) + + with pytest.raises(ValueError, match="object_name must be provided"): + _AsyncWriteObjectStream(mock.Mock(), BUCKET, None) + + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that unimplemented methods raise NotImplementedError.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + with pytest.raises(NotImplementedError): + await stream.open() + + with pytest.raises(NotImplementedError): + await stream.close() + + with pytest.raises(NotImplementedError): + await stream.send(_storage_v2.BidiWriteObjectRequest()) + + with pytest.raises(NotImplementedError): + await stream.recv() + + +def test_add_to_sources_txt(): + """ + This is a dummy test to ensure that the new test file is included in the + package's source file list. This is necessary for some build and packaging + tools to recognize the file. + """ + pass From aaee2f331f051e6577a4240d1bc9662874a7689b Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 13 Nov 2025 19:15:26 +0000 Subject: [PATCH 2/4] remove unused import and add license info --- .../asyncio/async_write_object_stream.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index ef19463e1..07263ddd8 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -1,5 +1,26 @@ -# experimental poc - chandrasiri -import asyncio +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intended and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM(Technical Account Manager) +if you want to use these Rapid Storage APIs. + +""" from typing import Optional from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient From 4db8bf1e0b47f763ad5e95234d7f4735c775dc84 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 13 Nov 2025 19:16:55 +0000 Subject: [PATCH 3/4] remove unwated test --- tests/unit/asyncio/test_async_write_object_stream.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 3e7c25d36..9834b79c9 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -106,12 +106,3 @@ async def test_unimplemented_methods_raise_error(mock_client): with pytest.raises(NotImplementedError): await stream.recv() - - -def test_add_to_sources_txt(): - """ - This is a dummy test to ensure that the new test file is included in the - package's source file list. This is necessary for some build and packaging - tools to recognize the file. - """ - pass From 1936d051e2fd60ab2edba4a113a2ee3c629fc75b Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 17 Nov 2025 07:05:27 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/storage/client.py | 1 + tests/unit/test_client.py | 1 + 2 files changed, 2 insertions(+) diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index f03807ca7..3764c7a53 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -73,6 +73,7 @@ def _buckets_page_start(iterator, page, response): ) page.unreachable = unreachable + class Client(ClientWithProject): """Client to bundle configuration needed for API requests. diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 331f40d66..0bda14974 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3145,6 +3145,7 @@ def test_list_buckets_w_partial_success(self): page_start=_buckets_page_start, ) + class Test__item_to_bucket(unittest.TestCase): def _call_fut(self, iterator, item): from google.cloud.storage.client import _item_to_bucket