Add streaming pull manager and integrate new subscriber logic#5237
Conversation
Adds a new method, `subscribe_experimental`, to the pubsub subscriber client to use this new functionality. Leaves the older subscription implementation in place for now (will be removed by another PR after this gets more testing.)
|
|
||
| class Dispatcher(object): | ||
| def __init__(self, queue, subscriber): | ||
| def __init__(self, manager, queue): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| for item in items: | ||
| time_to_ack = item.time_to_ack | ||
| if time_to_ack is not None: | ||
| self._manager.ack_histogram.add(int(time_to_ack)) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| self._leaser = leaser.Leaser(self) | ||
| self._leaser.start() | ||
|
|
||
| def close(self, reason=None): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| # Put the request together. | ||
| request = types.StreamingPullRequest( | ||
| modify_deadline_ack_ids=list(lease_ids), |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| self._scheduler.schedule(self._callback, message) | ||
|
|
||
| def _should_recover(self, exception): | ||
| """Determine if an error on the RPC straem should be recovered. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
lukesneeringer
left a comment
There was a problem hiding this comment.
Approved, but I think it would be wise to add that one lock that is noted.
| """Start a thread to dispatch requests queued up by callbacks. | ||
| Spawns a thread to run :meth:`dispatch_callback`. | ||
| """ | ||
| if self._thread is not None: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| return self._ack_deadline | ||
|
|
||
| @property | ||
| def load(self): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Adds a new method, `subscribe_experimental`, to the pubsub subscriber client to use this new functionality. Leaves the older subscription implementation in place for now (will be removed by another PR after this gets more testing.)
Adds a new method,
subscribe_experimental, to the pubsub subscriber client to use this new functionality. Leaves the older subscription implementation in place for now (will be removed by another PR after this gets more testing.)