-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Coordinated consumer implementation #37
Conversation
the new `afkak.group.ConsumerGroup` manages a group that divides the partition consumers across all the members of the group. It tries to resemble `afkak.consumer.Consumer` as much as possible. Signed-off-by: Jesse Truscott <jtruscot@ciena.com>
set the timeout for JoinGroupRequest to be >30s because the broker can take that long.
- don't request full metadata, just our own topic - use the long delay when retrying timed out metadata requests
Correctness not guaranteed.
Now it prints the traceback for "Unhandled error in Deferred"!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
- Ignore all Trial temp directories like _trial_temp-1 - Don't ignore .noseids
I have been having difficulty telling messages from consumer groups apart when debugging. The addition of the object address helps.
This integration test was fixed by making the ConsumerGroup rejoin on UnknownMemberId. These changes were made while debugging the failure. Rather than using monkey-patching to watch for internal code paths which makes the test brittle, these changes make it wait for the desired state to appear. They also validate more about how partitions are assigned than the previous assertions. I had to stop the test from waiting on the deferred returned by ConsumerGroup(...).start(). It doesn't seem to fire reliably — I observed tests hanging forever in the cleanup phase waiting on this deferred. I am unsure what this test is trying to cover. It disables the heartbeat of one group member (by poking at private internals), then adds a second group member. Adding the second group member triggers a rebalance which restores the first member to a valid state. Is this meant to imitate the reactor of one of the members getting blocked for a while? Couldn't we test this sort of scenario better with with a Jepsen-style test?
No longer necessary without nose. Signed-off-by: Tom Most <tmost@blueplanet.com>
Signed-off-by: Tom Most <tmost@blueplanet.com>
✔️ 🎉 |
This was observed once on Travis: [ERROR] Traceback (most recent call last): File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks result = result.throwExceptionIntoGenerator(g) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/twisted/python/failure.py", line 512, in throwExceptionIntoGenerator return g.throw(self.type, self.value, self.tb) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/test/int/test_client_integration.py", line 130, in test_send_offset_request [resp] = yield self.client.send_offset_request([req]) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks result = g.send(result) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/client.py", line 706, in send_offset_request returnValue(self._handle_responses(resps, fail_on_error, callback)) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/client.py", line 763, in _handle_responses BrokerResponseError.raise_for_errno(resp.error, resp) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/common.py", line 311, in raise_for_errno raise subcls(*args) afkak.common.NotLeaderForPartitionError: error=6 (NOT_LEADER_FOR_PARTITION) OffsetResponse(topic=u'test_send_offset_request-BlehbfRnmi', partition=0, error=6, offsets=()) afkak.test.int.test_client_integration.TestAfkakClientIntegration.test_send_offset_request =============================================================================== [ERROR] Traceback (most recent call last): Failure: twisted.internet.defer.TimeoutError: <afkak.test.int.test_group_integration.TestAfkakGroupIntegration testMethod=test_broker_restart> (test_broker_restart) still running at 120.0 secs afkak.test.int.test_group_integration.TestAfkakGroupIntegration.test_broker_restart =============================================================================== [ERROR] Traceback (most recent call last): File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/twisted/trial/util.py", line 276, in _runSequentially thing = yield d File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/_group.py", line 857, in stop yield super(ConsumerGroup, self).stop(errback_result=errback_result) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks result = g.send(result) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/_group.py", line 285, in stop raise RestopError("Shutdown called on non-running coordinator") afkak.common.RestopError: Shutdown called on non-running coordinator afkak.test.int.test_group_integration.TestAfkakGroupIntegration.test_broker_restart =============================================================================== [ERROR] Traceback (most recent call last): File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks result = g.send(result) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/client.py", line 1249, in _send_request_to_coordinator self._handle_responses([decoded], True) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/client.py", line 779, in _handle_responses self.reset_consumer_group_metadata(consumer_group) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/client.py", line 281, in reset_consumer_group_metadata groups = tuple(_coerce_consumer_group(g) for g in groups) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/client.py", line 281, in <genexpr> groups = tuple(_coerce_consumer_group(g) for g in groups) File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/afkak/_util.py", line 56, in _coerce_consumer_group raise TypeError('consumer_group={!r} must be text'.format(consumer_group)) exceptions.TypeError: consumer_group=None must be text
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment in the changelog, but LGTM.
Thanks for the reviews! I will merge once the build is green and then do an alpha release with this feature. I expect bugs, but hopefully fewer than in the currently shipping version of the feature. |
There were some rough bits in the merge, so I am still working through test failures and forward-porting to Python 3.
There is a backwards-incompatible change in the Consumer API — the deferred returned by
.start()
,.stop()
, and.shutdown()
resolves with a two tuple instead of a single integer. I need to dig in further to see if that is really necessary. It seems like Consumer should expose the current and committed offsets in a different public API anyway, as they are useful for metrics if nothing else. Really you'd want to be able to access that information even when the consumer fails.Closes #1.
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal