New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added max_bytes
option and FetchRequest_v3 usage.
#962
Conversation
Took me long enough =) Fixes #870 |
kafka/consumer/fetcher.py
Outdated
# they are requested, so to avoid starvation with | ||
# `fetch_max_bytes` option we need this shuffle | ||
partition_data = list(partition_data.items()) | ||
random.shuffle(partition_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note to this shuffle. As python dicts and sets are not ordered, currently it will work properly without this shuffle (it will basically be shuffled by _create_fetch_requests, as it uses sets and dicts), but I still prefere it here, as:
- Python3.6 has ordered dicts, which can make the distribution way less equal
- While dicts are not retain insert order, they preserve key order by hashes, which can (probably) result in unequal consumption.
Please correct me if I'm wrong here. My tests run OK even without this shuffle, so I'm a bit concerned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird, Github swallowed my earlier comment.
+1 for explicitly shuffling to avoid py3.6 issues.
However, since the protocol explicitly responds in the order requested, maybe this should be exposed to the user? Default to shuffling the order to avoid starvation, but allow the user to override that to always request a particular order... I can see someone using this as a sort of poor-man's priority-queue. Not sure if this is a good idea, or if it'd require reworking a lot of of kafka-python
internals... just a thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did have the same idea, but could not find any applications to this. And it's probably not this PR's responsibility, as it's rather a feature.
kafka/consumer/group.py
Outdated
first message in the first non-empty partition of the fetch is | ||
larger than this value, the message will still be returned to | ||
ensure that the consumer can make progress. NOTE: consumer performs | ||
multiple fetches in parallel so memory usage will be higher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This note is a great idea, but tweak the wording slightly? It's unclear 1) what the memory usage will be higher than , and 2) how much higher... are we talking 20%, 2x, 5x, etc?
kafka/protocol/metadata.py
Outdated
@@ -47,6 +47,30 @@ class MetadataResponse_v1(Struct): | |||
) | |||
|
|||
|
|||
class MetadataResponse_v2(Struct): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be in a dedicated commit? It just feels logically separate, and I'm unclear why it's needed for the Fetch_Request/Response
stuff. Personally prefer a commit that says "Here's this new struct, here's the JIRA ticket KAFKA-XXXX where it was added. Requires broker version x.xx.xx.x. Here's what it changes compared to the old one" just because often spelunking in code I'll lookup the commit history to understand changes like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broken out in #974.
else: | ||
# As of version == 3 partitions will be returned in order as | ||
# they are requested, so to avoid starvation with | ||
# `fetch_max_bytes` option we need this shuffle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also add to the code comment something about Python 3.6 consistent ordering of dicts...
...this shuffle. Otherwise, in Python >= 3.6 dicts have consistent order, so can't guarantee hashing to dict/set will randomize order of partitions.
2f826b3
to
bb1887e
Compare
kafka/consumer/group.py
Outdated
larger than this value, the message will still be returned to | ||
ensure that the consumer can make progress. NOTE: consumer performs | ||
fetches to multiple nodes in parallel so memory usage will depend | ||
on the number of nodes containing partitions for the topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call them 'brokers' rather than 'nodes'?
LGTM with the caveat that I didn't have time to try the code, just read through it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests are failing on py2 for some reason -- can you investigate?
kafka/conn.py
Outdated
@@ -778,7 +778,8 @@ def filter(self, record): | |||
log.addFilter(log_filter) | |||
|
|||
test_cases = [ | |||
((0, 10), ApiVersionRequest[0]()), | |||
((0, 10, 1), MetadataRequest[2]([])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: for any broker after 0.10 we should just use the results of ApiVersions api request to set api version support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I implement it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes. forgot about this little wart of kafka brokers. so it turns out that kafka < 0.10 will silently ignore incorrect metadata api version requests (or at least not close the socket as expected). The socket behavior is different between py2 and py3, causing KafkaConnection._recv() to block indefinitely on py2.
So we can't use MetadataRequest[2] to check version and will need to implement the smarter ApiVersion approach to avoid breaking auto version checks on older brokers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test via KAFKA_VERSION=0.8.0 tox -e py27 -- test/test_consumer_group.py::test_paused
-- this hangs indefinitely on my laptop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will try it.
kafka/protocol/metadata.py
Outdated
@@ -47,6 +47,30 @@ class MetadataResponse_v1(Struct): | |||
) | |||
|
|||
|
|||
class MetadataResponse_v2(Struct): | |||
API_KEY = 3 | |||
API_VERSION = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be API_VERSION = 2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ou, yea =)
kafka/protocol/metadata.py
Outdated
@@ -47,6 +47,30 @@ class MetadataResponse_v1(Struct): | |||
) | |||
|
|||
|
|||
class MetadataResponse_v2(Struct): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree -- would prefer to separate this into separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's separated, see #974
As for the failing tests, I do suspect that something is strange, as they fail quite strangely on py27 and py26 on KAFKA<0.10.1, but the fail message has no sense:
And the tests run OK on my machine, I have no clue. |
bb1887e
to
85113a4
Compare
Also added a check for api_version=(0, 10, 1)
14b5b6f
to
3220fbf
Compare
Changed check_version to use ApiVersionResponse result. Works quite good for me. |
@dpkp Is there anything else for this PR, that needs to be addressed? |
@@ -830,6 +849,10 @@ def connect(): | |||
self._sock.setblocking(False) | |||
|
|||
if f.succeeded(): | |||
if version == (0, 10): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better would be if isinstance(request, ApiVersionRequest[0]):
@@ -752,6 +753,24 @@ def _next_correlation_id(self): | |||
self._correlation_id = (self._correlation_id + 1) % 2**31 | |||
return self._correlation_id | |||
|
|||
def _check_version_above_0_10(self, response): | |||
test_cases = [ | |||
# format (<broker verion>, <needed struct>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might note that the order here matters, and/or make sure we reverse sort it before checking for best match below
Looks great. I added two very minor points that we can fix later (or you can address now if you have time). I'll merge this before next release. |
Thanks for the cleanup, looks good =) I missed it on the weekend |
Also added a check for api_version=(0, 10, 1) and MetadataRequest_v2
KIP-74
NOTE: Depends on PR #974