Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added multi version kafka support #655

Closed

Conversation

@mrafayaleem
Copy link

commented May 5, 2016

@eapache @wvanbergen This is the PR I have been working on to address #617. Can you guys start reviewing it please. I am originally from Python background so please ignore any coding horrors here.

Also checkout how different response versions are handled in kafka-python here. For this PR, I am passing kafka version from request to response so that response knows how to decode itself. Also, please do leave comments on the way kafka version is assigned in test cases. It doesn't look right IMO.

Description:

  • Tries to address #617

Not implemented/missing/confused:

  • Handling offsets to kafka vs zk.
  • Config conflicts.

Issues:

  • I am aware of at least five test cases that are failing on this branch. These are mostly Decoded request does not match the encoded one. Any pointers would be highly appreciated. Failing tests:
    • TestOffsetCommitRequestV0
    • TestOffsetCommitRequestV1
    • TestOffsetFetchRequest
    • TestPartitionOffsetManagerMarkOffsetWithRetention
    • TestProduceRequest

Gotchas:

I figured out that the BNF for FetchResponse v1 is actually

v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  ThrottleTime => int32
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32

instead of

v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ThrottleTime
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
  ThrottleTime => int32

I think there is an error in the kafka protocol wiki here.

@eapache

This comment has been minimized.

Copy link
Contributor

commented May 5, 2016

I think there is an error in the kafka protocol wiki

That agrees with https://kafka.apache.org/protocol.html#protocol_messages, so I will update the wiki.

@eapache

This comment has been minimized.

Copy link
Contributor

commented May 5, 2016

The requests and responses should not store the actual kafka version (0.8.2 or 0.9 or whatever); they should store the actual API version number specific to them (see e.g. what OffsetFetchRequest already does on master). This is because different API versions may have substantially different behaviours (offsets to kafka vs zk as you point out) and so even if I'm using Kafka 0.9 I may still want to specify a different version of the message. We need to support this use case. This will also massively simplify the definition of allocateBody.

The kafka version should be stored either as a global (next to MaxRequestSize and MaxResponseSize) or in the Config struct. I'd prefer the config struct, but that would involve passing the config around in a lot more places. We should really do that anyways at some point, since MaxRequestSize and MaxResponseSize should not be global either, but that's a much larger chunk of work so I'm ok with another global for now.

config.go Outdated
@@ -8,8 +8,16 @@ import (

var validID *regexp.Regexp = regexp.MustCompile(`\A[A-Za-z0-9._-]*\z`)

type KafkaVersion struct {

This comment has been minimized.

Copy link
@eapache

eapache May 5, 2016

Contributor

having both kVersion and KafkaVersion as structs seems like an unnecessary complication; can't you just make one type out of them?


var LatestStable, _ = NewVersion("0.9.0.1")
var V0_10_0, _ = NewVersion("0.10.0.0")
var V0_9_0_1 = LatestStable

This comment has been minimized.

Copy link
@eapache

eapache May 5, 2016

Contributor

I'd prefer defining this with a literal var V0_9_0_1 = NewVersion("0.9.0.1") and then define the latest stable at the bottom with var LatestStable = V0_9_0_1.

utils.go Outdated
versionArr [4]int
}

func NewVersion(ver string) (*kVersion, error) {

This comment has been minimized.

Copy link
@eapache

eapache May 5, 2016

Contributor

there is no reason for this to take a string or return an error NewVersion(0, 9, 0, 1) reads cleaner

This comment has been minimized.

Copy link
@mrafayaleem
@mrafayaleem

This comment has been minimized.

Copy link
Author

commented May 5, 2016

I actually removed version number from OffsetFetchRequest but now I see what you mean here and I agree. So essentially, that means that version number should be implemented for every request/response similar to the one for OffsetFetchRequest on master, right? And I don't think that there is anyway we can do away without implementing any version on responses because this is how we would be able to decode them properly.

Right now, Kafka version has been made part of the config in this PR. Since our discussion started with this, why not go ahead with it rather than implementing it as global. Test cases would need to be modified though which I have already partially hacked here in an ugly way.

eapache added a commit that referenced this pull request Jun 9, 2016
Support configuring target kafka version
This is unused currently, but will have a bunch of uses soon. Based on a
simplified version of #655 by Mohammad
Rafay Aleem.

Addresses the major chunk of #617.
@eapache

This comment has been minimized.

Copy link
Contributor

commented Jun 9, 2016

Superseded mostly by #676 (based on this code but simplified) and other work that is ongoing. Thanks for your input, it was helpful in solving this!

@eapache eapache closed this Jun 9, 2016

@mrafayaleem

This comment has been minimized.

Copy link
Author

commented Jun 9, 2016

Glad to be of help. Cheers!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.