New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Numerous small performance and correctness issues #211
Conversation
_Key = TypeVar("_Key") | ||
_Client = TypeVar("_Client") | ||
|
||
|
||
class ClientMultiplexer(Generic[_Key, _Client]): | ||
_OpenedClientFactory = Callable[[], _Client] | ||
_OpenedClientFactory = Callable[[_Key], _Client] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a lot of "self" time in get_or_create callers from constructing the factory to pass in. This is in the publish hot path, hence this change
_closer: _ClientCloser | ||
_lock: asyncio.Lock | ||
_live_clients: Dict[_Key, _Client] | ||
_live_clients: Dict[_Key, Awaitable[_Client]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a lot of time in acquiring the lock in the publish hotpath- this change makes the lock not necessary
@@ -40,22 +38,16 @@ class MultiplexedSubscriberClient(SubscriberClientInterface): | |||
_executor: ThreadPoolExecutor | |||
_underlying_factory: AsyncSubscriberFactory | |||
|
|||
_multiplexer: ClientMultiplexer[SubscriptionPath, StreamingPullFuture] | |||
_lock: Lock | |||
_live_clients: Set[StreamingPullFuture] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, the subscriber enforced that there was only one open subscription stream per-subscription per-client, but there's actually no need for this.
return item.response_future | ||
|
||
def should_flush(self) -> bool: | ||
return self._tester.test(item.request for item in self._requests) | ||
def size(self) -> BatchSize: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the CPU in the publish path was in should_flush calls, which had to construct an iterable and iterate through every time.
element_count=1, byte_count=PubSubMessage.pb(request).ByteSize() | ||
) | ||
|
||
def _should_flush(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you'll note on the left... batching settings were previously ignored
馃 I have created a release \*beep\* \*boop\* --- ### [1.1.1](https://www.github.com/googleapis/python-pubsublite/compare/v1.1.0...v1.1.1) (2021-09-07) ### Bug Fixes * Add workaround for grpc/grpc#25364 ([#213](https://www.github.com/googleapis/python-pubsublite/issues/213)) ([e417bf3](https://www.github.com/googleapis/python-pubsublite/commit/e417bf39fe32c995e5ac2e0a807a10fee3f37d9f)) * Numerous small performance and correctness issues ([#211](https://www.github.com/googleapis/python-pubsublite/issues/211)) ([358a1d8](https://www.github.com/googleapis/python-pubsublite/commit/358a1d8a429086ee75373260eb087a9dd171e3e6)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 馃