New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add bi-directional stream consumption helpers. #5189
Conversation
@@ -0,0 +1,481 @@ | |||
# Copyright 2017, Google LLC All rights reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# Note: there is a possibility that this starts *before* the call | ||
# property is set. So we have to check if self.call is set before | ||
# seeing if it's active. | ||
if self.call is not None and not self.call.is_active(): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self._request_queue.put(request) | ||
else: | ||
# calling next should cause the call to raise. | ||
next(self.call) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This is the foundation of the Pub/Sub subscriber refactor. It exposes a socket-like interface for gRPC streams and builds a robust, resumable stream on top of that. Finally, it adds a class for consuming the stream in the background using callbacks.
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if not self._is_active(): | ||
# We have an item, but the call is closed. We should put the | ||
# item back on the queue so that the next call can consume it. | ||
self._queue.put(item) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
It will be provided with the same gRPC future as the underlying | ||
stream which will also be a :class:`grpc.Call`. | ||
""" | ||
self._callbacks.append(callback) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
def open(self): | ||
"""Opens the stream.""" | ||
if self.is_active: | ||
raise ValueError('Can not open an already open stream.') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
def add_done_callback(self, callback): | ||
"""Adds a callback that will be called when the RPC terminates. | ||
|
||
This occurs when the RPC errors or is successfully terminated. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved with comments. Addressing the comments is not a prerequisite to merge.
This is the foundation of the Pub/Sub subscriber refactor. It exposes a socket-like interface for gRPC streams and builds a robust, resumable stream on top of that. Finally, it adds a class for consuming the stream in the background using callbacks.
This code will replace the code in the current
google.cloud.pubsub_v1.subscriber._consumer
module.