From 7aeff367ed79e927164aee90bb2b3f767807fa40 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Tue, 14 Oct 2025 09:38:30 +0000 Subject: [PATCH 1/2] feat(experimental): add checksum for bidi reads operation --- .../asyncio/async_multi_range_downloader.py | 26 +++++++- .../test_async_multi_range_downloader.py | 63 ++++++++++++++++++- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py index b9ac0f88d..a4fb4d3ed 100644 --- a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py @@ -13,6 +13,9 @@ # limitations under the License. from __future__ import annotations +import google_crc32c +from google.api_core import exceptions +from google_crc32c import Checksum from typing import List, Optional, Tuple @@ -153,6 +156,15 @@ def __init__( :type read_handle: bytes :param read_handle: (Optional) An existing read handle. """ + + # Verify that the fast, C-accelerated version of crc32c is available. + # If not, raise an error to prevent silent performance degradation. + if google_crc32c.implementation != "c": + raise exceptions.GoogleAPICallError( + "The google-crc32c package is not installed with C support. " + "Bidi reads require the C extension for data integrity checks." + ) + self.client = client self.bucket_name = bucket_name self.object_name = object_name @@ -248,7 +260,19 @@ async def download_ranges( if object_data_range.read_range is None: raise Exception("Invalid response, read_range is None") - data = object_data_range.checksummed_data.content + checksummed_data = object_data_range.checksummed_data + data = checksummed_data.content + server_checksum = checksummed_data.crc32c + + client_crc32c = Checksum(data).digest() + client_checksum = int.from_bytes(client_crc32c, "big") + + if server_checksum != client_checksum: + raise Exception( + f"Checksum mismatch for read_id {object_data_range.read_range.read_id}. " + f"Server sent {server_checksum}, client calculated {client_checksum}." + ) + read_id = object_data_range.read_range.read_id buffer = read_id_to_writable_buffer_dict[read_id] buffer.write(data) diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index 27d1ed6dd..d920bc260 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -16,10 +16,13 @@ from unittest import mock from unittest.mock import AsyncMock from google.cloud import _storage_v2 +from google.api_core import exceptions +from google_crc32c import Checksum from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) +from google.cloud.storage._experimental.asyncio import async_read_object_stream from io import BytesIO @@ -114,6 +117,10 @@ async def test_download_ranges( self, mock_grpc_client, mock_cls_async_read_object_stream ): # Arrange + data = b"these_are_18_chars" + crc32c = Checksum(data).digest() + crc32c_int = int.from_bytes(crc32c, "big") + mock_mrd = await self._make_mock_mrd( mock_grpc_client, mock_cls_async_read_object_stream ) @@ -123,7 +130,7 @@ async def test_download_ranges( object_data_ranges=[ _storage_v2.ObjectRangeData( checksummed_data=_storage_v2.ChecksummedData( - content=b"these_are_18_chars", crc32c=123 + content=data, crc32c=crc32c_int ), range_end=True, read_range=_storage_v2.ReadRange( @@ -148,7 +155,7 @@ async def test_download_ranges( assert len(results) == 1 assert results[0].bytes_requested == 18 assert results[0].bytes_written == 18 - assert buffer.getvalue() == b"these_are_18_chars" + assert buffer.getvalue() == data @mock.patch( "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" @@ -251,3 +258,55 @@ async def test_downloading_without_opening_should_throw_error( # Assert assert str(exc.value) == "Underlying bidi-gRPC stream is not open" assert not mrd.is_stream_open + + @mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c") + @mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client") + def test_init_raises_if_crc32c_c_extension_is_missing( + self, mock_grpc_client, mock_google_crc32c + ): + mock_google_crc32c.implementation = "python" + + with pytest.raises(exceptions.GoogleAPICallError) as exc_info: + AsyncMultiRangeDownloader( + mock_grpc_client, "bucket", "object" + ) + + assert "The google-crc32c package is not installed with C support" in str(exc_info.value) + + @pytest.mark.asyncio + @mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum") + @mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client") + async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mock_checksum_class): + mock_stream = mock.AsyncMock(spec=async_read_object_stream._AsyncReadObjectStream) + + test_data = b"some-data" + server_checksum = 12345 + mock_checksum_instance = mock_checksum_class.return_value + mock_checksum_instance.digest.return_value = (54321).to_bytes(4, "big") + + mock_response = _storage_v2.BidiReadObjectResponse( + object_data_ranges=[ + _storage_v2.ObjectRangeData( + checksummed_data=_storage_v2.ChecksummedData( + content=test_data, crc32c=server_checksum + ), + read_range=_storage_v2.ReadRange(read_id=0), + range_end=True, + ) + ] + ) + + mock_stream.recv.side_effect = [mock_response, None] + + mrd = AsyncMultiRangeDownloader( + mock_client, "bucket", "object" + ) + mrd.read_obj_str = mock_stream + mrd._is_stream_open = True + + with pytest.raises(Exception) as exc_info: + await mrd.download_ranges([(0, len(test_data), BytesIO())]) + + assert "Checksum mismatch" in str(exc_info.value) + mock_checksum_class.assert_called_once_with(test_data) + From 016086967985e862996982848f3c3089b7328b46 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Thu, 16 Oct 2025 05:50:37 +0000 Subject: [PATCH 2/2] resolving comments --- .../_experimental/asyncio/async_multi_range_downloader.py | 6 ++++-- tests/unit/asyncio/test_async_multi_range_downloader.py | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py index a4fb4d3ed..d8b8eeb1c 100644 --- a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py @@ -28,6 +28,7 @@ from io import BytesIO from google.cloud import _storage_v2 +from google.cloud.storage.exceptions import DataCorruption _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 @@ -160,9 +161,10 @@ def __init__( # Verify that the fast, C-accelerated version of crc32c is available. # If not, raise an error to prevent silent performance degradation. if google_crc32c.implementation != "c": - raise exceptions.GoogleAPICallError( + raise exceptions.NotFound( "The google-crc32c package is not installed with C support. " "Bidi reads require the C extension for data integrity checks." + "For more information, see https://github.com/googleapis/python-crc32c." ) self.client = client @@ -268,7 +270,7 @@ async def download_ranges( client_checksum = int.from_bytes(client_crc32c, "big") if server_checksum != client_checksum: - raise Exception( + raise DataCorruption(response, f"Checksum mismatch for read_id {object_data_range.read_range.read_id}. " f"Server sent {server_checksum}, client calculated {client_checksum}." ) diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index d920bc260..529aa4559 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -24,6 +24,7 @@ ) from google.cloud.storage._experimental.asyncio import async_read_object_stream from io import BytesIO +from google.cloud.storage.exceptions import DataCorruption _TEST_BUCKET_NAME = "test-bucket" @@ -266,7 +267,7 @@ def test_init_raises_if_crc32c_c_extension_is_missing( ): mock_google_crc32c.implementation = "python" - with pytest.raises(exceptions.GoogleAPICallError) as exc_info: + with pytest.raises(exceptions.NotFound) as exc_info: AsyncMultiRangeDownloader( mock_grpc_client, "bucket", "object" ) @@ -304,7 +305,7 @@ async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mo mrd.read_obj_str = mock_stream mrd._is_stream_open = True - with pytest.raises(Exception) as exc_info: + with pytest.raises(DataCorruption) as exc_info: await mrd.download_ranges([(0, len(test_data), BytesIO())]) assert "Checksum mismatch" in str(exc_info.value)