diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py new file mode 100644 index 000000000..bedfbf7ba --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -0,0 +1,92 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intended and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM(Technical Account Manager) +if you want to use these APIs. + +""" + +from typing import Any, Optional +from google.cloud import _storage_v2 +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) + + +class _AsyncReadObjectStream(_AsyncAbstractObjectStream): + """Class representing a gRPC bidi-stream for reading data from a GCS ``Object``. + + This class provides a unix socket-like interface to a GCS ``Object``, with + methods like ``open``, ``close``, ``send``, and ``recv``. + + :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient.grpc_client` + :param client: async grpc client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the GCS ``bucket`` containing the object. + + :type object_name: str + :param object_name: The name of the GCS ``object`` to be read. + + :type generation_number: int + :param generation_number: (Optional) If present, selects a specific revision of + this object. + + :type read_handle: bytes + :param read_handle: (Optional) An existing handle for reading the object. + If provided, opening the bidi-gRPC connection will be faster. + """ + + def __init__( + self, + client: AsyncGrpcClient.grpc_client, + bucket_name: str, + object_name: str, + generation_number: Optional[int] = None, + read_handle: Optional[bytes] = None, + ) -> None: + if client is None: + raise ValueError("client must be provided") + if bucket_name is None: + raise ValueError("bucket_name must be provided") + if object_name is None: + raise ValueError("object_name must be provided") + + super().__init__( + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation_number, + ) + self.client: AsyncGrpcClient.grpc_client = client + self.read_handle: Optional[bytes] = read_handle + + async def open(self) -> None: + pass + + async def close(self) -> None: + pass + + async def send( + self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest + ) -> None: + pass + + async def recv(self) -> Any: + pass diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py new file mode 100644 index 000000000..89d0571b0 --- /dev/null +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -0,0 +1,86 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) +from google.cloud.storage._experimental.asyncio.async_read_object_stream import ( + _AsyncReadObjectStream, +) + + +def test_inheritance(): + """Test that _AsyncReadObjectStream inherits from _AsyncAbstractObjectStream.""" + assert issubclass(_AsyncReadObjectStream, _AsyncAbstractObjectStream) + + +def test_init(): + """Test the constructor of _AsyncReadObjectStream.""" + mock_client = mock.Mock(name="client") + bucket_name = "test-bucket" + object_name = "test-object" + generation = 12345 + read_handle = "some-handle" + + # Test with all parameters + stream = _AsyncReadObjectStream( + mock_client, + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation, + read_handle=read_handle, + ) + + assert stream.client is mock_client + assert stream.bucket_name == bucket_name + assert stream.object_name == object_name + assert stream.generation_number == generation + assert stream.read_handle == read_handle + + # Test with default parameters + stream_defaults = _AsyncReadObjectStream( + mock_client, bucket_name=bucket_name, object_name=object_name + ) + assert stream_defaults.client is mock_client + assert stream_defaults.bucket_name is bucket_name + assert stream_defaults.object_name is object_name + assert stream_defaults.generation_number is None + assert stream_defaults.read_handle is None + + +def test_init_with_invalid_parameters(): + """Test the constructor of _AsyncReadObjectStream with invalid params.""" + + with pytest.raises(ValueError): + _AsyncReadObjectStream(None, bucket_name=None, object_name=None) + + +@pytest.mark.asyncio +async def test_async_methods_are_awaitable(): + """Test that the async methods exist and are awaitable.""" + mock_client = mock.Mock(name="client") + stream = _AsyncReadObjectStream(mock_client, "bucket", "object") + + # These methods are currently empty, but we can test they are awaitable + # and don't raise exceptions. + try: + await stream.open() + await stream.close() + await stream.send(mock.Mock()) + await stream.recv() + except Exception as e: + pytest.fail(f"Async methods should be awaitable without errors. Raised: {e}")