Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.

from __future__ import annotations
import google_crc32c
from google.api_core import exceptions
from google_crc32c import Checksum

from typing import List, Optional, Tuple

Expand All @@ -25,6 +28,7 @@

from io import BytesIO
from google.cloud import _storage_v2
from google.cloud.storage.exceptions import DataCorruption


_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
Expand Down Expand Up @@ -153,6 +157,16 @@ def __init__(
:type read_handle: bytes
:param read_handle: (Optional) An existing read handle.
"""

# Verify that the fast, C-accelerated version of crc32c is available.
# If not, raise an error to prevent silent performance degradation.
if google_crc32c.implementation != "c":
raise exceptions.NotFound(
"The google-crc32c package is not installed with C support. "
"Bidi reads require the C extension for data integrity checks."
"For more information, see https://github.com/googleapis/python-crc32c."
)

self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
Expand Down Expand Up @@ -248,7 +262,19 @@ async def download_ranges(
if object_data_range.read_range is None:
raise Exception("Invalid response, read_range is None")

data = object_data_range.checksummed_data.content
checksummed_data = object_data_range.checksummed_data
data = checksummed_data.content
server_checksum = checksummed_data.crc32c

client_crc32c = Checksum(data).digest()
client_checksum = int.from_bytes(client_crc32c, "big")

if server_checksum != client_checksum:
raise DataCorruption(response,
f"Checksum mismatch for read_id {object_data_range.read_range.read_id}. "
f"Server sent {server_checksum}, client calculated {client_checksum}."
)

read_id = object_data_range.read_range.read_id
buffer = read_id_to_writable_buffer_dict[read_id]
buffer.write(data)
Expand Down
64 changes: 62 additions & 2 deletions tests/unit/asyncio/test_async_multi_range_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
from unittest import mock
from unittest.mock import AsyncMock
from google.cloud import _storage_v2
from google.api_core import exceptions
from google_crc32c import Checksum

from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)
from google.cloud.storage._experimental.asyncio import async_read_object_stream
from io import BytesIO
from google.cloud.storage.exceptions import DataCorruption


_TEST_BUCKET_NAME = "test-bucket"
Expand Down Expand Up @@ -114,6 +118,10 @@ async def test_download_ranges(
self, mock_grpc_client, mock_cls_async_read_object_stream
):
# Arrange
data = b"these_are_18_chars"
crc32c = Checksum(data).digest()
crc32c_int = int.from_bytes(crc32c, "big")

mock_mrd = await self._make_mock_mrd(
mock_grpc_client, mock_cls_async_read_object_stream
)
Expand All @@ -123,7 +131,7 @@ async def test_download_ranges(
object_data_ranges=[
_storage_v2.ObjectRangeData(
checksummed_data=_storage_v2.ChecksummedData(
content=b"these_are_18_chars", crc32c=123
content=data, crc32c=crc32c_int
),
range_end=True,
read_range=_storage_v2.ReadRange(
Expand All @@ -148,7 +156,7 @@ async def test_download_ranges(
assert len(results) == 1
assert results[0].bytes_requested == 18
assert results[0].bytes_written == 18
assert buffer.getvalue() == b"these_are_18_chars"
assert buffer.getvalue() == data

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
Expand Down Expand Up @@ -251,3 +259,55 @@ async def test_downloading_without_opening_should_throw_error(
# Assert
assert str(exc.value) == "Underlying bidi-gRPC stream is not open"
assert not mrd.is_stream_open

@mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c")
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
def test_init_raises_if_crc32c_c_extension_is_missing(
self, mock_grpc_client, mock_google_crc32c
):
mock_google_crc32c.implementation = "python"

with pytest.raises(exceptions.NotFound) as exc_info:
AsyncMultiRangeDownloader(
mock_grpc_client, "bucket", "object"
)

assert "The google-crc32c package is not installed with C support" in str(exc_info.value)

@pytest.mark.asyncio
@mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum")
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mock_checksum_class):
mock_stream = mock.AsyncMock(spec=async_read_object_stream._AsyncReadObjectStream)

test_data = b"some-data"
server_checksum = 12345
mock_checksum_instance = mock_checksum_class.return_value
mock_checksum_instance.digest.return_value = (54321).to_bytes(4, "big")

mock_response = _storage_v2.BidiReadObjectResponse(
object_data_ranges=[
_storage_v2.ObjectRangeData(
checksummed_data=_storage_v2.ChecksummedData(
content=test_data, crc32c=server_checksum
),
read_range=_storage_v2.ReadRange(read_id=0),
range_end=True,
)
]
)

mock_stream.recv.side_effect = [mock_response, None]

mrd = AsyncMultiRangeDownloader(
mock_client, "bucket", "object"
)
mrd.read_obj_str = mock_stream
mrd._is_stream_open = True

with pytest.raises(DataCorruption) as exc_info:
await mrd.download_ranges([(0, len(test_data), BytesIO())])

assert "Checksum mismatch" in str(exc_info.value)
mock_checksum_class.assert_called_once_with(test_data)