Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make publish futures compatible with concurrent.futures.as_completed() #397

Merged
merged 6 commits into from May 15, 2021

Conversation

@plamut
Copy link
Contributor

@plamut plamut commented Apr 26, 2021

Closes #368.
Possibly supersedes #374.

This is a different approach to the same feature request which does not rely on implementation details of concurrent.futures.Future (as discussed offline). Instead, the custom base Future implementation is replaced with the one from concurrent.futures package with only minor adjustments to fit specific needs of publisher/subscriber futures.

The interface stays the same, and the isinstance() checks should keep working, as concurrent.futures.Future is injected is injected into the right place in MRO.

PR checklist

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)
@plamut plamut requested a review from pradn Apr 26, 2021
@google-cla google-cla bot added the cla: yes label Apr 26, 2021
@plamut
Copy link
Contributor Author

@plamut plamut commented Apr 26, 2021

@dpcollins-google Could this break Pub/Sub Lite in any way? Does the Lite client currently use any Future internals?

Loading

The futures implementation is adjusted to work well with the built-in
function with the same name in `concurrent.futures` package.
@plamut
Copy link
Contributor Author

@plamut plamut commented Apr 26, 2021

module 'concurrent.futures' has no attribute 'InvalidStateError' in Python 3.7 and below. Will refactor the affected unit tests.

Loading

If setting a result/exception on a concurrent.futures.Future object,
an exception is raised only in Python3.8+, thus we conditionally
disable two unit tests.

This behavior change is fine, though, because users should never use
the set_result() and set_exception() methods directly.
google/cloud/pubsub_v1/futures.py Outdated Show resolved Hide resolved
Loading
@plamut
Copy link
Contributor Author

@plamut plamut commented Apr 27, 2021

Status checks got stuck.

Loading

@plamut plamut marked this pull request as ready for review Apr 27, 2021
@plamut plamut requested a review from as a code owner Apr 27, 2021
Copy link
Contributor

@jimfulton jimfulton left a comment

This looks great. I love the code reduction!

I'm a little nervous about the subclassing.

I would minimize the number of overridden methods.

In StreamingPullFuture, I'd change the private attributes to use a double- rather than a single-underscore prefixes to get name mangling and greatly reduce the chance of a future conflict with the base class.

Loading

google/cloud/pubsub_v1/futures.py Outdated Show resolved Hide resolved
Loading
# Set the result and trigger the future.
self._result = result
self._trigger()
return super().set_result(result=result)
Copy link
Contributor

@jimfulton jimfulton May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

Loading

self._completed.set()
for callback in self._callbacks:
callback(self)
return super().set_exception(exception=exception)
Copy link
Contributor

@jimfulton jimfulton May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

Loading

if err is None:
return self._result
raise err
return super().result(timeout=timeout)
Copy link
Contributor

@jimfulton jimfulton May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

Loading

@plamut
Copy link
Contributor Author

@plamut plamut commented May 14, 2021

In StreamingPullFuture, I'd change the private attributes to use a double- rather than a single-underscore prefixes to get name mangling and greatly reduce the chance of a future conflict with the base class.

This is a very good suggestion, a case where we really do want to assure there's no accidental overriding.

Why do we need this override? (x3)

To override the base implementations' docstrings', we want to prereserve the existing docstrings of Pub/Sub futures' methods. Do you know if there's a more idiomatic way to do this other than merely delegating the call to the implementation up the MRO chain?

Do we know of any clients that are passing this argument? (I see this package was one. :) )

Do we plan to get rid of this in the near future?

The library only implements a thread-based batch, which is an internal default the publisher client uses. That batch class is the only thing that passed in the now-removed completed argument, and we are not aware of any 3rd parties that would considerably mangle the client code to change the publisher concurrency paradigm.

End users also do not instantiate the futures directly (and if they do, they violate the contract :) ).

No plans for removing the completed argument yet, but could make sense to do so, since since concurrent.futures.Future tracks its own state differently and doesn't need it.

Loading

@jimfulton
Copy link
Contributor

@jimfulton jimfulton commented May 14, 2021

Why do we need this override? (x3)

To override the base implementations' docstrings', we want to prereserve the existing docstrings of Pub/Sub futures' methods. Do you know if there's a more idiomatic way to do this other than merely delegating the call to the implementation up the MRO chain?

IMO. It's not worth it.

The less overriding the better.

Do we know of any clients that are passing this argument? (I see this package was one. :) )
Do we plan to get rid of this in the near future?

The library only implements a thread-based batch, which is an internal default the publisher client uses. That batch class is the only thing that passed in the now-removed completed argument, and we are not aware of any 3rd parties that would considerably mangle the client code to change the publisher concurrency paradigm.

End users also do not instantiate the futures directly (and if they do, they violate the contract :) ).

No plans for removing the completed argument yet, but could make sense to do so, since since concurrent.futures.Future tracks its own state differently and doesn't need it.

I would plan to remove it "soon".

If we're confident that no clients are calling this, then I'd remove it now.

Every line of code we own is a liability -- overridden inherited code from external (to us) dependencies doubly (or triply) so.

These are suggestions. My approval stands.

Loading

google/cloud/pubsub_v1/subscriber/futures.py Outdated Show resolved Hide resolved
Loading
@jimfulton
Copy link
Contributor

@jimfulton jimfulton commented May 14, 2021

I have nothing more to say. :)

Loading

@plamut
Copy link
Contributor Author

@plamut plamut commented May 14, 2021

I'll re-think about actually removing the completed argument right now when I re-visit this on Monday, since a loud error is probably better than silently changing the argument's effect to a no-op. Ditto for the other remark about overriding. Thanks for all the feedback!

Loading

@dpcollins-google
Copy link
Contributor

@dpcollins-google dpcollins-google commented May 14, 2021

lgtm, shouldn't break pub/sub lite

Loading

@anguillanneuf
Copy link
Contributor

@anguillanneuf anguillanneuf commented May 14, 2021

@plamut After this update is released, shall we update our publish/subscribe samples? Or add new samples to demonstrate how to work with these futures?

Loading

This parameter is unlikely to be used by any 3rd party code, but
even if it is, it's better to cause a loud error rather than silently
changing its effect to a no-op.
@plamut
Copy link
Contributor Author

@plamut plamut commented May 15, 2021

On a second thought, removing the completed parameter is actually better - it will cause a loud error for any code that uses it, instead of possibly introducing non-obvious bugs if its behavior was silently changed to a no-op. As for the overriding methods that preserve the existing docstrings and keep it under our control, I opted to keep them.

After this update is released, shall we update our publish/subscribe samples? Or add new samples to demonstrate how to work with these futures?

@anguillanneuf The existing samples should not need any modifications, since that's actually one of the goals - futures should continue working as before.

On the other hand, it would make sense to demonstrate their new capability, i.e. compatibility with concurrent.futures.as_completed(). Maybe as a use case where somebody publishes multiple messages in a loop and then wants to block until all operations complete?

Something like the following:

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_NAME, TOPIC_NAME)

future_to_msg = {}

for i in range(10):
    msg = f"message {i}".encode()
    suffix = "" if i % 2 else "x"
    future = publisher.publish(topic_path + suffix, msg)
    future_to_msg[future] = msg
    time.sleep(0.01)

for future in concurrent.futures.as_completed(future_to_msg):
    msg = future_to_msg[future].decode()
    try:
        result = future.result()
    except Exception as exc:
        print(f"Error publishing message {msg}: {exc}")
    else:
        print(f"Message successfully published: {msg}")

Sample output (with logging level set to CRITICAL to cut off background thread noise):

Message successfully published: message 5
Error publishing message message 0: 404 Resource not found (resource=topic-*****).
Error publishing message message 8: 404 Resource not found (resource=topic-*****).
Message successfully published: message 7
Message successfully published: message 9
Message successfully published: message 3
Message successfully published: message 1
Error publishing message message 6: 404 Resource not found (resource=topic-*****).
Error publishing message message 2: 404 Resource not found (resource=topic-*****).
Error publishing message message 4: 404 Resource not found (resource=topic-*****).

Loading

@plamut plamut merged commit e29a2c0 into googleapis:master May 15, 2021
8 checks passed
Loading
@plamut plamut deleted the iss-368-b branch May 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

5 participants