-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add full version matrix produce/consume test #1040
Conversation
7b5451c
to
2819841
Compare
3ebfca4
to
4f80234
Compare
@eapache please take a look. |
consumer.go
Outdated
@@ -514,7 +514,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe | |||
} | |||
} | |||
|
|||
if incomplete || len(messages) == 0 { | |||
if incomplete && len(messages) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this change; incomplete
can only be set if messages
contains at least one complete message already, so this condition becomes impossible to satisfy? I assume this is the kafka: response did not contain all the expected topic/partition blocks
fix you mention; I'd appreciate more details on that.
I'm looking at this method again, and I'm wondering if the incomplete
boolean is even necessary anymore. I'm not sure what it was trying to accomplish originally (guarding against decreasing offsets maybe?), but a simple check on len(messages) == 0
seems to be sufficient on its own? I'm not sure. That might even mean we don't need prelude
anymore either, and can just unconditionally skip messages with offset < child.offset
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You right, the error got away because this condition is now impossible to achieve. My reasoning was that incomplete record is supposed to be claimed as error only if there are no complete messages in the batch/set. But now looking at incomplete I understand that it actually does not mean partial trailing message as I thought.
I do not think that even len(message) == 0
should be treated as an error. If Kafka broker decides to return an empty block, so what? it just means that there are no new message and sarama should just make another long polling fetch request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so the case where there are legitimately no messages in the block is already handled in parseResponse
. If we get here and there are no messages we could extract, then I think it's legitimate to conclude an error of some sort since the server presumably sent us only messages we didn't ask for? I don't know if it's OK for the server to respond with a prelude but no actual messages...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason why I saw kafka: response did not contain all the expected topic/partition blocks
error while running this test, was exactly because the server was replying with a sets/batches containing prelude only. I will double check that tomorrow when I am back in the office.
prodCfg.Producer.Flush.MaxMessages = flush | ||
prodCfg.Producer.Compression = codec | ||
|
||
p, err := NewSyncProducer(kafkaBrokers, prodCfg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be simpler to run this test with an AsyncProducer so you can avoid spinning up a bunch of goroutines and tracking them with a waitgroup in order to get parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point was to have several producers produce different types of messageSets/batchRecords concurrently. I certainly can use AsyncProducers for that although then I would need to store them somewhere and collect produced messages from all of their Success() channels, not sure if this implementation is going to be shorter though. I can implement it this way if you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok, I missed the wg.Wait()
being fully outside the outer loop and thought the purpose was only for batching. This is fine then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know what, I cannot reproduce kafka: response did not contain all the expected topic/partition blocks
now at all. Looks like #1041 fixed the root cause of this issue too. So I suggest simplifying messageSet/recordBatch parsing a bit to make it easier to understand.
Note that I left:
if len(messages) == 0 {
return nil, ErrIncompleteResponse
}
intact. But honestly I think it would be better just ignore this case. Because what do you expect the library user to do about that error? Nothing but retry, and sarama can do that on its own.
V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) | ||
V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) | ||
|
||
SupportedVersions = []KafkaVersion{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why you decided to expose these values publicly? I don't see a use for them outside the tests you wrote.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sarama exposes the Protocol implementation that is used in my project mailgun/kafka-pixy, but some things are missing. And this is one of them. I realize that this is a selfish reason, so if you are against it, I will roll it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's also fine, I just couldn't think of a use case for it but you provided one.
@horkhe - Do your producer tests set
@eapache - There are other closed issues from the past that have the same error message but this might be a unique case just for when |
Ya, please open a new issue. The implementation of |
@eapache Now I'm not sure this is unique to |
@pkedy nope, I do not set headers. Later we can add a test that sets headers, although that would be for a much smaller subset of versions. |
@eapache After rebase to the latest master, the gzip compression encode/decode test started to fail. The test was broken by #1044, but a deeper problem is that CI tells that build is successful even if tests fail. In this PR that is fixed. But be careful merging PRs before the fix is in master. I fixed the gzip test by the way. |
Thank you for all of your work on this!
That's really weird, I swear it used to fail properly and I haven't changed the config recently. Anyway, thank you for fixing this, I'm glad the only thing I missed due to this was a minor test failure. |
Test errors stopped failing builds since #960. Because in bash if commands in a pipeline are separated with |
This PR adds a test that is checking that messages produced by clients of all supported versions can be consumed by clients of all supported version, also throwing all possible compressions into the mix. When the test runs it uses the value of the
KAFKA_VERSION
environment variable as the upper version bound. First it produces with clients of all version up to the upper limit and for each version it produces messages with different compression algorithms. Then it reads all produced messages with clients of all supported versions.The test revealed several issues:
:kafka: response did not contain all the expected topic/partition blocks
EDIT: I fixed that in this PR by only reporting
ErrIncompleteResponse
when there is no complete messages retrieved;;lz4.Read: invalid header checksum: got 26 expected 130
: basically lz4 compression does not work at all, I commented it out in the test for nowEDIT: My mistake, since lz4 was introduced in v0.10.0.0 older clients obviously cannot read it. I moved LZ4 testing to another test case that features version matrix starting with v0.10.0.0;
Consumed unexpected offset: version=<any version>, index=3219, want={offset: 3220, value: msg:0.11.0.0:none:35}, got={offset: 3221, value: msg:0.11.0.0:none:35}
messages produced with client version 0.11.0.0 and higher are fetched with incorrect offset when consumed by any client version. So my earlier PR that suggested offsetSkew for 0.11.0.0-1.0.0 is incomplete. The fix is most likely should be in producer, because errors are returned by clients of all supported versions.EDIT: Hurray, LastOffsetDelta calculation fix #1041, fixed this issue, thank you @pkedy.
WARNING: For some reason Travis-CI thinks that build succeeds for all targets even though tests fail.EDIT: I fixed that. Now test failure makes entire build fail.
So you can see that builds for version 0.11.0.0 and higher fail.EIDT2: Builds for version 0.11.0.0 and higher do not fail anymore, thanks to #1041.