Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added multi version kafka support #655

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions broker.go
Expand Up @@ -188,6 +188,7 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
err = b.sendAndReceive(request, nil)
} else {
response = new(ProduceResponse)
response.KafkaVersion = request.KafkaVersion
err = b.sendAndReceive(request, response)
}

Expand All @@ -200,6 +201,7 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {

func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
response := new(FetchResponse)
response.KafkaVersion = request.KafkaVersion

err := b.sendAndReceive(request, response)

Expand All @@ -212,6 +214,7 @@ func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {

func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
response := new(OffsetCommitResponse)
response.KafkaVersion = request.KafkaVersion

err := b.sendAndReceive(request, response)

Expand All @@ -224,6 +227,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon

func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
response := new(OffsetFetchResponse)
response.KafkaVersion = request.KafkaVersion

err := b.sendAndReceive(request, response)

Expand Down
5 changes: 5 additions & 0 deletions broker_test.go
Expand Up @@ -106,6 +106,7 @@ var brokerTestTable = []struct {
{[]byte{},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.KafkaVersion = V0_8_2_2
request.RequiredAcks = NoResponse
response, err := broker.Produce(&request)
if err != nil {
Expand All @@ -119,6 +120,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.KafkaVersion = V0_8_2_2
request.RequiredAcks = WaitForLocal
response, err := broker.Produce(&request)
if err != nil {
Expand All @@ -132,6 +134,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := FetchRequest{}
request.KafkaVersion = V0_8_2_2
response, err := broker.Fetch(&request)
if err != nil {
t.Error(err)
Expand All @@ -144,6 +147,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetFetchRequest{}
request.KafkaVersion = V0_8_2_2
response, err := broker.FetchOffset(&request)
if err != nil {
t.Error(err)
Expand All @@ -156,6 +160,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetCommitRequest{}
request.KafkaVersion = V0_8_2_2
response, err := broker.CommitOffset(&request)
if err != nil {
t.Error(err)
Expand Down
10 changes: 10 additions & 0 deletions config.go
Expand Up @@ -10,6 +10,10 @@ var validID *regexp.Regexp = regexp.MustCompile(`\A[A-Za-z0-9._-]*\z`)

// Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {

// This defines the Kafka broker version and is used for broker version specific behaviour
KafkaVersion *KafkaVersion

// Net is the namespace for network-level properties used by the Broker, and
// shared by the Client/Producer/Consumer.
Net struct {
Expand Down Expand Up @@ -219,6 +223,8 @@ type Config struct {
func NewConfig() *Config {
c := &Config{}

c.KafkaVersion = V0_8_2_2

c.Net.MaxOpenRequests = 5
c.Net.DialTimeout = 30 * time.Second
c.Net.ReadTimeout = 30 * time.Second
Expand Down Expand Up @@ -363,3 +369,7 @@ func (c *Config) Validate() error {

return nil
}

func (v *KafkaVersion) AtLeast(ver *KafkaVersion) bool {
return v.GE(ver)
}
5 changes: 3 additions & 2 deletions consumer.go
Expand Up @@ -679,8 +679,9 @@ func (bc *brokerConsumer) abort(err error) {

func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request := &FetchRequest{
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
KafkaVersion: bc.consumer.conf.KafkaVersion,
}

for child := range bc.subscriptions {
Expand Down
15 changes: 14 additions & 1 deletion fetch_request.go
Expand Up @@ -25,6 +25,9 @@ type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
blocks map[string]map[int32]*fetchRequestBlock

// This is not part of the request bytes sent to Kafka
KafkaVersion *KafkaVersion
}

func (f *FetchRequest) encode(pe packetEncoder) (err error) {
Expand Down Expand Up @@ -103,7 +106,17 @@ func (f *FetchRequest) key() int16 {
}

func (f *FetchRequest) version() int16 {
return 0
// These return values are coupled with FetchResponse and according to Kafka client protocol:
// Fetch Response v1 only contains message format v0.
// Fetch Response v2 might either contain message format v0 or message format v1.
// See FetchRequest decode implementation for more details.
if f.KafkaVersion.AtLeast(V0_10_0) {
return 2
} else if f.KafkaVersion.AtLeast(V0_9_0_0) {
return 1
} else {
return 0
}
}

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
Expand Down
5 changes: 4 additions & 1 deletion fetch_request_test.go
@@ -1,6 +1,8 @@
package sarama

import "testing"
import (
"testing"
)

var (
fetchRequestNoBlocks = []byte{
Expand All @@ -21,6 +23,7 @@ var (

func TestFetchRequest(t *testing.T) {
request := new(FetchRequest)
request.KafkaVersion = V0_8_2_2
testRequest(t, "no blocks", request, fetchRequestNoBlocks)

request.MaxWaitTime = 0x20
Expand Down
14 changes: 13 additions & 1 deletion fetch_response.go
Expand Up @@ -34,6 +34,11 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
// zero means the request did not violate any quota. This value is applicable only on Kafka version >= 0.9.0.0
ThrottleTime int32

// This is not part of the response bytes received from Kafka
KafkaVersion *KafkaVersion
}

func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
Expand All @@ -50,6 +55,13 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
}

func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
if fr.KafkaVersion.AtLeast(V0_9_0_0) {
fr.ThrottleTime, err = pd.getInt32()
if err != nil {
return err
}
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -167,7 +179,7 @@ func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value En
if value != nil {
vb, _ = value.Encode()
}
msg := &Message{Key: kb, Value: vb}
msg := &Message{Key: kb, Value: vb, KafkaVersion: fr.KafkaVersion}
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
frb.MsgSet.Messages = append(frb.MsgSet.Messages, msgBlock)
}
2 changes: 2 additions & 0 deletions fetch_response_test.go
Expand Up @@ -30,6 +30,7 @@ var (

func TestEmptyFetchResponse(t *testing.T) {
response := FetchResponse{}
response.KafkaVersion = V0_8_2_2
testDecodable(t, "empty", &response, emptyFetchResponse)

if len(response.Blocks) != 0 {
Expand All @@ -40,6 +41,7 @@ func TestEmptyFetchResponse(t *testing.T) {

func TestOneMessageFetchResponse(t *testing.T) {
response := FetchResponse{}
response.KafkaVersion = V0_8_2_2
testDecodable(t, "one message", &response, oneMessageFetchResponse)

if len(response.Blocks) != 1 {
Expand Down
10 changes: 10 additions & 0 deletions kafka_versions.go
@@ -0,0 +1,10 @@
package sarama

var V0_10_0 = NewVersion(0, 10, 0, 0)
var V0_9_0_1 = NewVersion(0, 9, 0, 1)
var V0_9_0_0 = NewVersion(0, 9, 0, 0)
var V0_8_2_2 = NewVersion(0, 8, 2, 2)
var V0_8_2_1 = NewVersion(0, 8, 2, 1)
var V0_8_2_0 = NewVersion(0, 8, 2, 0)
var V0_8_1_0 = NewVersion(0, 8, 1, 0)
var LatestStable = V0_9_0_1
2 changes: 2 additions & 0 deletions message.go
Expand Up @@ -29,6 +29,8 @@ type Message struct {
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap

KafkaVersion *KafkaVersion

compressedCache []byte
}

Expand Down
4 changes: 2 additions & 2 deletions message_test.go
Expand Up @@ -44,7 +44,7 @@ var (
)

func TestMessageEncoding(t *testing.T) {
message := Message{}
message := Message{KafkaVersion: V0_8_2_2}
testEncodable(t, "empty", &message, emptyMessage)

message.Value = []byte{}
Expand All @@ -53,7 +53,7 @@ func TestMessageEncoding(t *testing.T) {
}

func TestMessageDecoding(t *testing.T) {
message := Message{}
message := Message{KafkaVersion: V0_8_2_2}
testDecodable(t, "empty", &message, emptyMessage)
if message.Codec != CompressionNone {
t.Error("Decoding produced compression codec where there was none.")
Expand Down
2 changes: 2 additions & 0 deletions mockresponses.go
Expand Up @@ -218,7 +218,9 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of

func (mfr *MockFetchResponse) For(reqBody decoder) encoder {
fetchRequest := reqBody.(*FetchRequest)
fetchRequest.KafkaVersion = V0_8_2_2
res := &FetchResponse{}
res.KafkaVersion = fetchRequest.KafkaVersion
for topic, partitions := range fetchRequest.blocks {
for partition, block := range partitions {
initialOffset := block.fetchOffset
Expand Down
34 changes: 17 additions & 17 deletions offset_commit_request.go
Expand Up @@ -46,24 +46,18 @@ type OffsetCommitRequest struct {
ConsumerID string // v1 or later
RetentionTime int64 // v2 or later

// Version can be:
// - 0 (kafka 0.8.1 and later)
// - 1 (kafka 0.8.2 and later)
// - 2 (kafka 0.8.3 and later)
Version int16
blocks map[string]map[int32]*offsetCommitRequestBlock
blocks map[string]map[int32]*offsetCommitRequestBlock

// This is not part of the request bytes sent to Kafka
KafkaVersion *KafkaVersion
}

func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 2 {
return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
}

if err := pe.putString(r.ConsumerGroup); err != nil {
return err
}

if r.Version >= 1 {
if r.version() >= 1 {
pe.putInt32(r.ConsumerGroupGeneration)
if err := pe.putString(r.ConsumerID); err != nil {
return err
Expand All @@ -77,7 +71,7 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
}
}

if r.Version >= 2 {
if r.version() >= 2 {
pe.putInt64(r.RetentionTime)
} else if r.RetentionTime != 0 {
Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored")
Expand All @@ -95,7 +89,7 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
}
for partition, block := range partitions {
pe.putInt32(partition)
if err := block.encode(pe, r.Version); err != nil {
if err := block.encode(pe, r.version()); err != nil {
return err
}
}
Expand All @@ -108,7 +102,7 @@ func (r *OffsetCommitRequest) decode(pd packetDecoder) (err error) {
return err
}

if r.Version >= 1 {
if r.version() >= 1 {
if r.ConsumerGroupGeneration, err = pd.getInt32(); err != nil {
return err
}
Expand All @@ -117,7 +111,7 @@ func (r *OffsetCommitRequest) decode(pd packetDecoder) (err error) {
}
}

if r.Version >= 2 {
if r.version() >= 2 {
if r.RetentionTime, err = pd.getInt64(); err != nil {
return err
}
Expand Down Expand Up @@ -147,7 +141,7 @@ func (r *OffsetCommitRequest) decode(pd packetDecoder) (err error) {
return err
}
block := &offsetCommitRequestBlock{}
if err := block.decode(pd, r.Version); err != nil {
if err := block.decode(pd, r.version()); err != nil {
return err
}
r.blocks[topic][partition] = block
Expand All @@ -161,7 +155,13 @@ func (r *OffsetCommitRequest) key() int16 {
}

func (r *OffsetCommitRequest) version() int16 {
return r.Version
if r.KafkaVersion.AtLeast(V0_9_0_0) {
return 2
} else if r.KafkaVersion.AtLeast(V0_8_2_0) {
return 1
} else {
return 0
}
}

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
Expand Down
6 changes: 3 additions & 3 deletions offset_commit_request_test.go
Expand Up @@ -56,7 +56,7 @@ var (

func TestOffsetCommitRequestV0(t *testing.T) {
request := new(OffsetCommitRequest)
request.Version = 0
request.KafkaVersion = V0_8_1_0
request.ConsumerGroup = "foobar"
testRequest(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0)

Expand All @@ -69,7 +69,7 @@ func TestOffsetCommitRequestV1(t *testing.T) {
request.ConsumerGroup = "foobar"
request.ConsumerID = "cons"
request.ConsumerGroupGeneration = 0x1122
request.Version = 1
request.KafkaVersion = V0_8_2_2
testRequest(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
Expand All @@ -82,7 +82,7 @@ func TestOffsetCommitRequestV2(t *testing.T) {
request.ConsumerID = "cons"
request.ConsumerGroupGeneration = 0x1122
request.RetentionTime = 0x4433
request.Version = 2
request.KafkaVersion = V0_9_0_0
testRequest(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
Expand Down
3 changes: 3 additions & 0 deletions offset_commit_response.go
Expand Up @@ -2,6 +2,9 @@ package sarama

type OffsetCommitResponse struct {
Errors map[string]map[int32]KError

// This is not part of the response bytes received from Kafka
KafkaVersion *KafkaVersion
}

func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
Expand Down