Skip to content

Commit

Permalink
Add re-open throttling to ResumableBidiRpc
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 30, 2019
1 parent 7bc6bf7 commit d201ab1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
6 changes: 4 additions & 2 deletions api_core/google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def __init__(self, start_rpc, should_recover, initial_request=None, metadata=Non
self._operational_lock = threading.RLock()
self._finalized = False
self._finalize_lock = threading.Lock()
self._reopen_throttle = _Throttle(time_window=10, entry_cap=5)

def _finalize(self, result):
with self._finalize_lock:
Expand Down Expand Up @@ -440,7 +441,8 @@ def _reopen(self):
# retryable error.

try:
self.open()
with self._reopen_throttle:
self.open()
# If re-opening or re-calling the method fails for any reason,
# consider it a terminal error and finalize the stream.
except Exception as exc:
Expand Down Expand Up @@ -639,7 +641,7 @@ def start(self):
thread = threading.Thread(
name=_BIDIRECTIONAL_CONSUMER_NAME,
target=self._thread_main,
args=(ready,)
args=(ready,),
)
thread.daemon = True
thread.start()
Expand Down
14 changes: 14 additions & 0 deletions api_core/tests/unit/test_bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,20 @@ def test_reopen_failure_on_rpc_restart(self):
assert bidi_rpc.is_active is False
callback.assert_called_once_with(error2)

def test_throttles_reopen_requests(self):
call = CallStub([])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)

patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
with patcher as mock_enter:
bidi_rpc._reopen()

mock_enter.assert_called_once()

def test_send_not_open(self):
bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)

Expand Down

0 comments on commit d201ab1

Please sign in to comment.