Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
21e66a3
add AsyncAbstractObjectStream
chandra-siri Sep 17, 2025
39503f4
keep _AsyncAbstractObjectStream private
chandra-siri Sep 17, 2025
a161fd0
Add _AsyncReadObjectStream and it's stubs
chandra-siri Sep 17, 2025
dd862a2
complete __init__ for read_obj_str
chandra-siri Sep 17, 2025
aaabfd7
remove unuseful comments
chandra-siri Sep 17, 2025
23eea96
add methods open close send recv
chandra-siri Sep 17, 2025
71a7a79
change read_handle type from 'str' to 'bytes'
chandra-siri Sep 18, 2025
827aec0
feat: add async_multi_range_downloader
chandra-siri Sep 18, 2025
5be7469
fix: read_ranges should have buffer as well
chandra-siri Sep 18, 2025
b3ad551
rename MultiRangeDownloader to AsyncMultiRangeDownloader
chandra-siri Sep 18, 2025
a87f2be
feat: implement download_ranges method
chandra-siri Sep 18, 2025
c2e3c7b
add BytesIO in doc string
chandra-siri Sep 18, 2025
3c0fd66
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 18, 2025
0810afc
fix doc strings, add licence and type hints
chandra-siri Sep 18, 2025
d43d889
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
a14bc68
pass abstract methods
chandra-siri Sep 18, 2025
fbacbb4
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
fd37489
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
635ad07
add handle param
chandra-siri Sep 18, 2025
ba453d4
include handle in tests
chandra-siri Sep 18, 2025
800c6df
remove unit tests for abstract class
chandra-siri Sep 18, 2025
8b40812
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
18529ad
edit doc string for _AsyncReadObjectStream
chandra-siri Sep 18, 2025
df2532e
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
b4da1ac
refactor unit tests for async_read_object_stream
chandra-siri Sep 18, 2025
6dec6c6
bucket_name and object_name cannot be NONE
chandra-siri Sep 18, 2025
90a65f6
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
a154905
bucket_name and object_name cannot be None
chandra-siri Sep 18, 2025
d69cd63
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
20afca4
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
14a2aba
simplyfy tests for open
chandra-siri Sep 19, 2025
078afca
simply tests for send recv and close
chandra-siri Sep 19, 2025
2054989
minor edit - add bidi-stream in doc string
chandra-siri Sep 19, 2025
2b9ae2e
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 19, 2025
c06896c
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
916e7f1
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
3658502
Merge branch 'bidi_reads_4_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
8366f0b
simplify unit tests
chandra-siri Sep 19, 2025
4a774ec
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
0991383
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 19, 2025
f589b89
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
06b102c
improve doc string
chandra-siri Sep 19, 2025
52494b4
fix unit tess in MRD
chandra-siri Sep 19, 2025
8e00701
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
0e60694
add checks for invalid inputs
chandra-siri Sep 19, 2025
961def8
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
e9d0f9e
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 22, 2025
c4f61ea
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 22, 2025
ef5d917
Merge branch 'bidi_reads_4_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 22, 2025
7fbf28c
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 22, 2025
dcb6a55
remove duplicated import
chandra-siri Sep 23, 2025
90d8597
remove unused import
chandra-siri Sep 23, 2025
229887b
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
5a6ffc7
Merge branch 'bidi_reads_4_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
9f15551
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
234336f
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 23, 2025
521154c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 23, 2025
8338ab2
fix unit test
chandra-siri Sep 23, 2025
c3cb076
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
ac8e651
remove unused import
chandra-siri Sep 23, 2025
c6b1098
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
6469063
implement basic functionality for download_ranges
chandra-siri Sep 24, 2025
17404af
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 24, 2025
130046f
doc string for further testcase
chandra-siri Sep 24, 2025
0c61e87
remove unwanted comments
chandra-siri Sep 24, 2025
027758c
remove testing code
chandra-siri Sep 24, 2025
570983f
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 24, 2025
99ffe2d
fix doc strings
chandra-siri Sep 24, 2025
e571dd1
Merge branch 'bidi_reads_6_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 24, 2025
962b7b9
instantiate `socket_like_rpc` while opening
chandra-siri Sep 24, 2025
fdaf1ae
update doc string to describe `read_ranges` format
chandra-siri Sep 25, 2025
cf6e004
add is_stream_open and check this before "open" "close" "send" "recv"
chandra-siri Sep 25, 2025
153007d
Merge branch 'bidi_reads_6_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 25, 2025
f1874ed
assert stream is indeed closed/open on close/open
chandra-siri Sep 25, 2025
ce7a73b
feat(zb-experimental): Implement `close` functionality for MRD
chandra-siri Sep 25, 2025
64de09f
fix linting issues for assert statements
chandra-siri Sep 25, 2025
573e548
Merge branch 'bidi_reads_7' of github.com:googleapis/python-storage i…
chandra-siri Sep 25, 2025
23c279c
fix liniting issues for assert statement
chandra-siri Sep 25, 2025
112cb68
add test case for ranges > 1000
chandra-siri Sep 26, 2025
b56efd9
don't return exception object , raise instead
chandra-siri Sep 29, 2025
890eac1
don't return exception object , raise instead
chandra-siri Sep 29, 2025
84b63f1
correct doc string rtype
chandra-siri Sep 29, 2025
f93b25e
Merge branch 'bidi_reads_6_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 29, 2025
8432f17
Merge branch 'bidi_reads_7' of github.com:googleapis/python-storage i…
chandra-siri Sep 29, 2025
1aa5693
remove unnecessary return statement
chandra-siri Sep 30, 2025
a39dd99
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 30, 2025
f05fb2f
fix linting issues - duplicate imports and defs
chandra-siri Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,23 @@ class AsyncMultiRangeDownloader:
client, bucket_name="chandrasiri-rs", object_name="test_open9"
)
my_buff1 = open('my_fav_file.txt', 'wb')
my_buff1 = open('my_fav_file.txt', 'wb')
my_buff2 = BytesIO()
my_buff3 = BytesIO()
my_buff4 = any_object_which_provides_BytesIO_like_interface()
results_arr, error_obj = await mrd.download_ranges(
my_buff4 = any_object_which_provides_BytesIO_like_interface()
results_arr, error_obj = await mrd.download_ranges(
[
# (start_byte, bytes_to_read, writeable_buffer)
# (start_byte, bytes_to_read, writeable_buffer)
(0, 100, my_buff1),
(100, 20, my_buff2),
(200, 123, my_buff3),
(300, 789, my_buff4),
(100, 20, my_buff2),
(200, 123, my_buff3),
(300, 789, my_buff4),
]
)
if error_obj:
Expand All @@ -94,6 +101,17 @@ class AsyncMultiRangeDownloader:
for result in results_arr:
print("downloaded bytes", result)

if error_obj:
print("Error occurred: ")
print(error_obj)
print(
"please issue call to `download_ranges` with updated"
"`read_ranges` based on diff of (bytes_requested - bytes_written)"
)

for result in results_arr:
print("downloaded bytes", result)


"""

Expand Down Expand Up @@ -165,7 +183,8 @@ def __init__(
self.object_name = object_name
self.generation_number = generation_number
self.read_handle = read_handle
self.read_obj_str: _AsyncReadObjectStream = None
self.read_obj_str: Optional[_AsyncReadObjectStream] = None
self._is_stream_open: bool = False

async def open(self) -> None:
"""Opens the bidi-gRPC connection to read from the object.
Expand All @@ -176,14 +195,19 @@ async def open(self) -> None:
"Opening" constitutes fetching object metadata such as generation number
and read handle and sets them as attributes if not already set.
"""
self.read_obj_str = _AsyncReadObjectStream(
client=self.client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation_number,
read_handle=self.read_handle,
)
if self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is already open")

if self.read_obj_str is None:
self.read_obj_str = _AsyncReadObjectStream(
client=self.client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation_number,
read_handle=self.read_handle,
)
await self.read_obj_str.open()
self._is_stream_open = True
if self.generation_number is None:
self.generation_number = self.read_obj_str.generation_number
self.read_handle = self.read_obj_str.read_handle
Expand All @@ -206,11 +230,15 @@ async def download_ranges(
to a requested range.

"""

if len(read_ranges) > 1000:
raise ValueError(
"Invalid input - length of read_ranges cannot be more than 1000"
)

if not self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is not open")

read_id_to_writable_buffer_dict = {}
results = []
for i in range(0, len(read_ranges), _MAX_READ_RANGES_PER_BIDI_READ_REQUEST):
Expand Down Expand Up @@ -255,4 +283,18 @@ async def download_ranges(
del read_id_to_writable_buffer_dict[
object_data_range.read_range.read_id
]

return results

async def close(self):
"""
Closes the underlying bidi-gRPC connection.
"""
if not self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is not open")
await self.read_obj_str.close()
self._is_stream_open = False

@property
def is_stream_open(self) -> bool:
return self._is_stream_open
107 changes: 99 additions & 8 deletions tests/unit/asyncio/test_async_multi_range_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@


class TestAsyncMultiRangeDownloader:
def create_read_ranges(self, num_ranges):
ranges = []
for i in range(num_ranges):
ranges.append(
_storage_v2.ReadRange(read_offset=i, read_length=1, read_id=i)
)
return ranges

# helper method
@pytest.mark.asyncio
async def _make_mock_mrd(
Expand Down Expand Up @@ -76,13 +84,24 @@ async def test_create_mrd(
read_handle=_TEST_READ_HANDLE,
)

mrd.read_obj_str.open.assert_called_once()
# Assert
mock_cls_async_read_object_stream.assert_called_once_with(
client=mock_grpc_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
generation_number=_TEST_GENERATION_NUMBER,
read_handle=_TEST_READ_HANDLE,
)

mrd.read_obj_str.open.assert_called_once()

assert mrd.client == mock_grpc_client
assert mrd.bucket_name == _TEST_BUCKET_NAME
assert mrd.object_name == _TEST_OBJECT_NAME
assert mrd.generation_number == _TEST_GENERATION_NUMBER
assert mrd.read_handle == _TEST_READ_HANDLE
assert mrd.is_stream_open

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
Expand Down Expand Up @@ -131,14 +150,6 @@ async def test_download_ranges(
assert results[0].bytes_written == 18
assert buffer.getvalue() == b"these_are_18_chars"

def create_read_ranges(self, num_ranges):
ranges = []
for i in range(num_ranges):
ranges.append(
_storage_v2.ReadRange(read_offset=i, read_length=1, read_id=i)
)
return ranges

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -160,3 +171,83 @@ async def test_downloading_ranges_with_more_than_1000_should_throw_error(
str(exc.value)
== "Invalid input - length of read_ranges cannot be more than 1000"
)

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_opening_mrd_more_than_once_should_throw_error(
self, mock_grpc_client, mock_cls_async_read_object_stream
):
# Arrange
mrd = await self._make_mock_mrd(
mock_grpc_client, mock_cls_async_read_object_stream
) # mock mrd is already opened

# Act + Assert
with pytest.raises(ValueError) as exc:
await mrd.open()

# Assert
assert str(exc.value) == "Underlying bidi-gRPC stream is already open"

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_close_mrd(self, mock_grpc_client, mock_cls_async_read_object_stream):
# Arrange
mrd = await self._make_mock_mrd(
mock_grpc_client, mock_cls_async_read_object_stream
) # mock mrd is already opened
mrd.read_obj_str.close = AsyncMock()

# Act
await mrd.close()

# Assert
assert not mrd.is_stream_open

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_close_mrd_not_opened_should_throw_error(self, mock_grpc_client):
# Arrange
mrd = AsyncMultiRangeDownloader(
mock_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
)

# Act + Assert
with pytest.raises(ValueError) as exc:
await mrd.close()

# Assert
assert str(exc.value) == "Underlying bidi-gRPC stream is not open"
assert not mrd.is_stream_open

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_downloading_without_opening_should_throw_error(
self, mock_grpc_client
):
# Arrange
mrd = AsyncMultiRangeDownloader(
mock_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
)

# Act + Assert
with pytest.raises(ValueError) as exc:
await mrd.download_ranges([(0, 18, BytesIO())])

# Assert
assert str(exc.value) == "Underlying bidi-gRPC stream is not open"
assert not mrd.is_stream_open