From 2a33a7c59a083a2d13341d36f460f876e29dd926 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Tue, 28 Oct 2025 07:13:05 +0000 Subject: [PATCH 1/3] feat(experimental): Add base resumption strategy for bidi streams --- .../asyncio/retry/base_strategy.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/retry/base_strategy.py diff --git a/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py b/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py new file mode 100644 index 000000000..96ac8b547 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py @@ -0,0 +1,67 @@ +import abc +from typing import Any + +class _BaseResumptionStrategy(abc.ABC): + """Abstract base class defining the interface for a bidi stream strategy. + + This class defines the skeleton for a pluggable strategy that contains + all the service-specific logic for a given bidi operation (e.g., reads + or writes). This allows a generic retry manager to handle the common + retry loop while sending the state management and request generation + to a concrete implementation of this class. + """ + + @abc.abstractmethod + def generate_requests(self, state: Any): + """Generates the next batch of requests based on the current state. + + This method is called at the beginning of each retry attempt. It should + inspect the provided state object and generate the appropriate list of + request protos to send to the server. For example, a read strategy + would use this to implement "Smarter Resumption" by creating smaller + `ReadRange` requests for partially downloaded ranges. + + :type state: Any + :param state: An object containing all the state needed for the + operation (e.g., requested ranges, user buffers, + bytes written). + """ + pass + + @abc.abstractmethod + def update_state_from_response(self, state: Any): + """Updates the state based on a successful server response. + + This method is called for every message received from the server. It is + responsible for processing the response and updating the shared state + object. For bidi reads, this is where data integrity checks should be + performed and the `bytes_written` for the corresponding range should be + updated. + + :type state: Any + :param state: The shared state object for the operation, which will be + mutated by this method. + + :rtype: None + """ + pass + + @abc.abstractmethod + async def recover_state_on_failure(self, error: Exception, state: Any): + """Prepares the state for the next retry attempt after a failure. + + This method is called when a retriable gRPC error occurs. It is + responsible for performing any necessary actions to ensure the next + retry attempt can succeed. For bidi reads, its primary role is to + handle the `BidiReadObjectRedirectError` by extracting the + `routing_token` and updating the state. + + :type error: :class:`Exception` + :param error: The exception that was caught by the retry engine. + + :type state: Any + :param state: The shared state object for the operation. + + :rtype: None + """ + pass From 16c0e5e54512bd9aba09b67b732f07d094c3a9b8 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Wed, 29 Oct 2025 12:46:48 +0000 Subject: [PATCH 2/3] minor changes --- .../_experimental/asyncio/retry/base_strategy.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py b/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py index 96ac8b547..e5d14076f 100644 --- a/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py +++ b/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py @@ -1,5 +1,5 @@ import abc -from typing import Any +from typing import Any, Iterable class _BaseResumptionStrategy(abc.ABC): """Abstract base class defining the interface for a bidi stream strategy. @@ -12,7 +12,7 @@ class _BaseResumptionStrategy(abc.ABC): """ @abc.abstractmethod - def generate_requests(self, state: Any): + def generate_requests(self, state: Any) -> Iterable[Any]: """Generates the next batch of requests based on the current state. This method is called at the beginning of each retry attempt. It should @@ -29,25 +29,21 @@ def generate_requests(self, state: Any): pass @abc.abstractmethod - def update_state_from_response(self, state: Any): + def update_state_from_response(self, state: Any) -> None: """Updates the state based on a successful server response. This method is called for every message received from the server. It is responsible for processing the response and updating the shared state - object. For bidi reads, this is where data integrity checks should be - performed and the `bytes_written` for the corresponding range should be - updated. + object. :type state: Any :param state: The shared state object for the operation, which will be mutated by this method. - - :rtype: None """ pass @abc.abstractmethod - async def recover_state_on_failure(self, error: Exception, state: Any): + async def recover_state_on_failure(self, error: Exception, state: Any) -> None: """Prepares the state for the next retry attempt after a failure. This method is called when a retriable gRPC error occurs. It is @@ -61,7 +57,5 @@ async def recover_state_on_failure(self, error: Exception, state: Any): :type state: Any :param state: The shared state object for the operation. - - :rtype: None """ pass From 26eb4542bebc428ceecdf42cbfca435fcbe23195 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Mon, 3 Nov 2025 08:11:39 +0000 Subject: [PATCH 3/3] resolving comments --- .../_experimental/asyncio/retry/base_strategy.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py b/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py index e5d14076f..e4e443815 100644 --- a/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py +++ b/google/cloud/storage/_experimental/asyncio/retry/base_strategy.py @@ -2,7 +2,7 @@ from typing import Any, Iterable class _BaseResumptionStrategy(abc.ABC): - """Abstract base class defining the interface for a bidi stream strategy. + """Abstract base class defining the interface for a bidi stream resumption strategy. This class defines the skeleton for a pluggable strategy that contains all the service-specific logic for a given bidi operation (e.g., reads @@ -19,7 +19,9 @@ def generate_requests(self, state: Any) -> Iterable[Any]: inspect the provided state object and generate the appropriate list of request protos to send to the server. For example, a read strategy would use this to implement "Smarter Resumption" by creating smaller - `ReadRange` requests for partially downloaded ranges. + `ReadRange` requests for partially downloaded ranges. For bidi-writes, + it will set the `write_offset` field to the persisted size received + from the server in the next request. :type state: Any :param state: An object containing all the state needed for the @@ -50,7 +52,9 @@ async def recover_state_on_failure(self, error: Exception, state: Any) -> None: responsible for performing any necessary actions to ensure the next retry attempt can succeed. For bidi reads, its primary role is to handle the `BidiReadObjectRedirectError` by extracting the - `routing_token` and updating the state. + `routing_token` and updating the state. For bidi writes, it will update + the state to reflect any bytes that were successfully persisted before + the failure. :type error: :class:`Exception` :param error: The exception that was caught by the retry engine.