diff --git a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py index b9ac0f88d..d8b8eeb1c 100644 --- a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py @@ -13,6 +13,9 @@ # limitations under the License. from __future__ import annotations +import google_crc32c +from google.api_core import exceptions +from google_crc32c import Checksum from typing import List, Optional, Tuple @@ -25,6 +28,7 @@ from io import BytesIO from google.cloud import _storage_v2 +from google.cloud.storage.exceptions import DataCorruption _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 @@ -153,6 +157,16 @@ def __init__( :type read_handle: bytes :param read_handle: (Optional) An existing read handle. """ + + # Verify that the fast, C-accelerated version of crc32c is available. + # If not, raise an error to prevent silent performance degradation. + if google_crc32c.implementation != "c": + raise exceptions.NotFound( + "The google-crc32c package is not installed with C support. " + "Bidi reads require the C extension for data integrity checks." + "For more information, see https://github.com/googleapis/python-crc32c." + ) + self.client = client self.bucket_name = bucket_name self.object_name = object_name @@ -248,7 +262,19 @@ async def download_ranges( if object_data_range.read_range is None: raise Exception("Invalid response, read_range is None") - data = object_data_range.checksummed_data.content + checksummed_data = object_data_range.checksummed_data + data = checksummed_data.content + server_checksum = checksummed_data.crc32c + + client_crc32c = Checksum(data).digest() + client_checksum = int.from_bytes(client_crc32c, "big") + + if server_checksum != client_checksum: + raise DataCorruption(response, + f"Checksum mismatch for read_id {object_data_range.read_range.read_id}. " + f"Server sent {server_checksum}, client calculated {client_checksum}." + ) + read_id = object_data_range.read_range.read_id buffer = read_id_to_writable_buffer_dict[read_id] buffer.write(data) diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index 27d1ed6dd..529aa4559 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -16,11 +16,15 @@ from unittest import mock from unittest.mock import AsyncMock from google.cloud import _storage_v2 +from google.api_core import exceptions +from google_crc32c import Checksum from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) +from google.cloud.storage._experimental.asyncio import async_read_object_stream from io import BytesIO +from google.cloud.storage.exceptions import DataCorruption _TEST_BUCKET_NAME = "test-bucket" @@ -114,6 +118,10 @@ async def test_download_ranges( self, mock_grpc_client, mock_cls_async_read_object_stream ): # Arrange + data = b"these_are_18_chars" + crc32c = Checksum(data).digest() + crc32c_int = int.from_bytes(crc32c, "big") + mock_mrd = await self._make_mock_mrd( mock_grpc_client, mock_cls_async_read_object_stream ) @@ -123,7 +131,7 @@ async def test_download_ranges( object_data_ranges=[ _storage_v2.ObjectRangeData( checksummed_data=_storage_v2.ChecksummedData( - content=b"these_are_18_chars", crc32c=123 + content=data, crc32c=crc32c_int ), range_end=True, read_range=_storage_v2.ReadRange( @@ -148,7 +156,7 @@ async def test_download_ranges( assert len(results) == 1 assert results[0].bytes_requested == 18 assert results[0].bytes_written == 18 - assert buffer.getvalue() == b"these_are_18_chars" + assert buffer.getvalue() == data @mock.patch( "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" @@ -251,3 +259,55 @@ async def test_downloading_without_opening_should_throw_error( # Assert assert str(exc.value) == "Underlying bidi-gRPC stream is not open" assert not mrd.is_stream_open + + @mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c") + @mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client") + def test_init_raises_if_crc32c_c_extension_is_missing( + self, mock_grpc_client, mock_google_crc32c + ): + mock_google_crc32c.implementation = "python" + + with pytest.raises(exceptions.NotFound) as exc_info: + AsyncMultiRangeDownloader( + mock_grpc_client, "bucket", "object" + ) + + assert "The google-crc32c package is not installed with C support" in str(exc_info.value) + + @pytest.mark.asyncio + @mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum") + @mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client") + async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mock_checksum_class): + mock_stream = mock.AsyncMock(spec=async_read_object_stream._AsyncReadObjectStream) + + test_data = b"some-data" + server_checksum = 12345 + mock_checksum_instance = mock_checksum_class.return_value + mock_checksum_instance.digest.return_value = (54321).to_bytes(4, "big") + + mock_response = _storage_v2.BidiReadObjectResponse( + object_data_ranges=[ + _storage_v2.ObjectRangeData( + checksummed_data=_storage_v2.ChecksummedData( + content=test_data, crc32c=server_checksum + ), + read_range=_storage_v2.ReadRange(read_id=0), + range_end=True, + ) + ] + ) + + mock_stream.recv.side_effect = [mock_response, None] + + mrd = AsyncMultiRangeDownloader( + mock_client, "bucket", "object" + ) + mrd.read_obj_str = mock_stream + mrd._is_stream_open = True + + with pytest.raises(DataCorruption) as exc_info: + await mrd.download_ranges([(0, len(test_data), BytesIO())]) + + assert "Checksum mismatch" in str(exc_info.value) + mock_checksum_class.assert_called_once_with(test_data) +