From bc47bd993d87c4ac7d9234baf79f239b95c76637 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 10 Dec 2018 21:31:48 +0000 Subject: [PATCH] Changes vendor of Shopify/sarama back to official version --- Gopkg.lock | 7 +- Gopkg.toml | 4 +- .../Shopify/sarama/mockresponses.go | 2 +- .../Shopify/sarama/offset_commit_request.go | 8 +- .../Shopify/sarama/offset_commit_response.go | 31 ++++- .../Shopify/sarama/offset_fetch_request.go | 39 ++++-- .../Shopify/sarama/offset_fetch_response.go | 116 +++++++++++++----- .../Shopify/sarama/offset_request.go | 34 ++++- 8 files changed, 183 insertions(+), 58 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 4104ee80..7f06fae3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -10,13 +10,12 @@ version = "v1.3.4" [[projects]] - branch = "cm-new-describe-configs" - digest = "1:1da0bcc79ed11012ddda1303291729d46ae7f734e723fb0f3e9bcd9a1e6a2c9a" + branch = "master" + digest = "1:c13bdcfc4b0d36e4ee20816e1606e49aa60d601ecef38c72dd8902d773e3d992" name = "github.com/Shopify/sarama" packages = ["."] pruneopts = "NUT" - revision = "f9494b44e15f8db1dae8c2257d641f52b5ba5f0a" - source = "github.com/Mongey/sarama" + revision = "861a752af5039cb76ed02ce31cb4e4fc1cb65d11" [[projects]] digest = "1:8bd40ec66a32437126e8ff3080f26b11ca5926917daa01eba1285d0b059416bc" diff --git a/Gopkg.toml b/Gopkg.toml index 507d12e2..fd7b252f 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -3,9 +3,7 @@ [[constraint]] name = "github.com/Shopify/sarama" - source = "github.com/Mongey/sarama" - #version = "v1.19.0" - branch = "cm-new-describe-configs" + branch = "master" [[constraint]] name = "github.com/hashicorp/terraform" diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go index 17204419..fe55200c 100644 --- a/vendor/github.com/Shopify/sarama/mockresponses.go +++ b/vendor/github.com/Shopify/sarama/mockresponses.go @@ -523,7 +523,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3 partitions = make(map[int32]*OffsetFetchResponseBlock) topics[topic] = partitions } - partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror} + partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror} return mr } diff --git a/vendor/github.com/Shopify/sarama/offset_commit_request.go b/vendor/github.com/Shopify/sarama/offset_commit_request.go index 37e99fbf..1ec583e6 100644 --- a/vendor/github.com/Shopify/sarama/offset_commit_request.go +++ b/vendor/github.com/Shopify/sarama/offset_commit_request.go @@ -52,12 +52,14 @@ type OffsetCommitRequest struct { // - 0 (kafka 0.8.1 and later) // - 1 (kafka 0.8.2 and later) // - 2 (kafka 0.9.0 and later) + // - 3 (kafka 0.11.0 and later) + // - 4 (kafka 2.0.0 and later) Version int16 blocks map[string]map[int32]*offsetCommitRequestBlock } func (r *OffsetCommitRequest) encode(pe packetEncoder) error { - if r.Version < 0 || r.Version > 2 { + if r.Version < 0 || r.Version > 4 { return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"} } @@ -174,6 +176,10 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion { return V0_8_2_0 case 2: return V0_9_0_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 default: return MinVersion } diff --git a/vendor/github.com/Shopify/sarama/offset_commit_response.go b/vendor/github.com/Shopify/sarama/offset_commit_response.go index a4b18acd..e842298d 100644 --- a/vendor/github.com/Shopify/sarama/offset_commit_response.go +++ b/vendor/github.com/Shopify/sarama/offset_commit_response.go @@ -1,7 +1,9 @@ package sarama type OffsetCommitResponse struct { - Errors map[string]map[int32]KError + Version int16 + ThrottleTimeMs int32 + Errors map[string]map[int32]KError } func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) { @@ -17,6 +19,9 @@ func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KE } func (r *OffsetCommitResponse) encode(pe packetEncoder) error { + if r.Version >= 3 { + pe.putInt32(r.ThrottleTimeMs) + } if err := pe.putArrayLength(len(r.Errors)); err != nil { return err } @@ -36,6 +41,15 @@ func (r *OffsetCommitResponse) encode(pe packetEncoder) error { } func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if version >= 3 { + r.ThrottleTimeMs, err = pd.getInt32() + if err != nil { + return err + } + } + numTopics, err := pd.getArrayLength() if err != nil || numTopics == 0 { return err @@ -77,9 +91,20 @@ func (r *OffsetCommitResponse) key() int16 { } func (r *OffsetCommitResponse) version() int16 { - return 0 + return r.Version } func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { - return MinVersion + switch r.Version { + case 1: + return V0_8_2_0 + case 2: + return V0_9_0_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 + default: + return MinVersion + } } diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_request.go b/vendor/github.com/Shopify/sarama/offset_fetch_request.go index 5a05014b..68608241 100644 --- a/vendor/github.com/Shopify/sarama/offset_fetch_request.go +++ b/vendor/github.com/Shopify/sarama/offset_fetch_request.go @@ -1,28 +1,33 @@ package sarama type OffsetFetchRequest struct { - ConsumerGroup string Version int16 + ConsumerGroup string partitions map[string][]int32 } func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) { - if r.Version < 0 || r.Version > 1 { + if r.Version < 0 || r.Version > 5 { return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"} } if err = pe.putString(r.ConsumerGroup); err != nil { return err } - if err = pe.putArrayLength(len(r.partitions)); err != nil { - return err - } - for topic, partitions := range r.partitions { - if err = pe.putString(topic); err != nil { + + if r.Version >= 2 && r.partitions == nil { + pe.putInt32(-1) + } else { + if err = pe.putArrayLength(len(r.partitions)); err != nil { return err } - if err = pe.putInt32Array(partitions); err != nil { - return err + for topic, partitions := range r.partitions { + if err = pe.putString(topic); err != nil { + return err + } + if err = pe.putInt32Array(partitions); err != nil { + return err + } } } return nil @@ -37,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) if err != nil { return err } - if partitionCount == 0 { + if (partitionCount == 0 && version < 2) || partitionCount < 0 { return nil } r.partitions = make(map[string][]int32) @@ -67,11 +72,25 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: return V0_8_2_0 + case 2: + return V0_10_2_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 + case 5: + return V2_1_0_0 default: return MinVersion } } +func (r *OffsetFetchRequest) ZeroPartitions() { + if r.partitions == nil && r.Version >= 2 { + r.partitions = make(map[string][]int32) + } +} + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) { if r.partitions == nil { r.partitions = make(map[string][]int32) diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_response.go b/vendor/github.com/Shopify/sarama/offset_fetch_response.go index 11e4b1f3..9e257028 100644 --- a/vendor/github.com/Shopify/sarama/offset_fetch_response.go +++ b/vendor/github.com/Shopify/sarama/offset_fetch_response.go @@ -1,17 +1,25 @@ package sarama type OffsetFetchResponseBlock struct { - Offset int64 - Metadata string - Err KError + Offset int64 + LeaderEpoch int32 + Metadata string + Err KError } -func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) { +func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) { b.Offset, err = pd.getInt64() if err != nil { return err } + if version >= 5 { + b.LeaderEpoch, err = pd.getInt32() + if err != nil { + return err + } + } + b.Metadata, err = pd.getString() if err != nil { return err @@ -26,9 +34,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) { return nil } -func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) { +func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) { pe.putInt64(b.Offset) + if version >= 5 { + pe.putInt32(b.LeaderEpoch) + } + err = pe.putString(b.Metadata) if err != nil { return err @@ -40,10 +52,17 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) { } type OffsetFetchResponse struct { - Blocks map[string]map[int32]*OffsetFetchResponseBlock + Version int16 + ThrottleTimeMs int32 + Blocks map[string]map[int32]*OffsetFetchResponseBlock + Err KError } func (r *OffsetFetchResponse) encode(pe packetEncoder) error { + if r.Version >= 3 { + pe.putInt32(r.ThrottleTimeMs) + } + if err := pe.putArrayLength(len(r.Blocks)); err != nil { return err } @@ -56,51 +75,73 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error { } for partition, block := range partitions { pe.putInt32(partition) - if err := block.encode(pe); err != nil { + if err := block.encode(pe, r.Version); err != nil { return err } } } + if r.Version >= 2 { + pe.putInt16(int16(r.Err)) + } return nil } func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) { - numTopics, err := pd.getArrayLength() - if err != nil || numTopics == 0 { - return err - } - - r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics) - for i := 0; i < numTopics; i++ { - name, err := pd.getString() - if err != nil { - return err - } + r.Version = version - numBlocks, err := pd.getArrayLength() + if version >= 3 { + r.ThrottleTimeMs, err = pd.getInt32() if err != nil { return err } + } - if numBlocks == 0 { - r.Blocks[name] = nil - continue - } - r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks) + numTopics, err := pd.getArrayLength() + if err != nil { + return err + } - for j := 0; j < numBlocks; j++ { - id, err := pd.getInt32() + if numTopics > 0 { + r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics) + for i := 0; i < numTopics; i++ { + name, err := pd.getString() if err != nil { return err } - block := new(OffsetFetchResponseBlock) - err = block.decode(pd) + numBlocks, err := pd.getArrayLength() if err != nil { return err } - r.Blocks[name][id] = block + + if numBlocks == 0 { + r.Blocks[name] = nil + continue + } + r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks) + + for j := 0; j < numBlocks; j++ { + id, err := pd.getInt32() + if err != nil { + return err + } + + block := new(OffsetFetchResponseBlock) + err = block.decode(pd, version) + if err != nil { + return err + } + r.Blocks[name][id] = block + } + } + } + + if version >= 2 { + kerr, err := pd.getInt16() + if err != nil { + return err } + r.Err = KError(kerr) } return nil @@ -111,11 +152,24 @@ func (r *OffsetFetchResponse) key() int16 { } func (r *OffsetFetchResponse) version() int16 { - return 0 + return r.Version } func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { - return MinVersion + switch r.Version { + case 1: + return V0_8_2_0 + case 2: + return V0_10_2_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 + case 5: + return V2_1_0_0 + default: + return MinVersion + } } func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock { diff --git a/vendor/github.com/Shopify/sarama/offset_request.go b/vendor/github.com/Shopify/sarama/offset_request.go index 4c5df75d..326c3720 100644 --- a/vendor/github.com/Shopify/sarama/offset_request.go +++ b/vendor/github.com/Shopify/sarama/offset_request.go @@ -27,12 +27,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) } type OffsetRequest struct { - Version int16 - blocks map[string]map[int32]*offsetRequestBlock + Version int16 + replicaID int32 + isReplicaIDSet bool + blocks map[string]map[int32]*offsetRequestBlock } func (r *OffsetRequest) encode(pe packetEncoder) error { - pe.putInt32(-1) // replica ID is always -1 for clients + if r.isReplicaIDSet { + pe.putInt32(r.replicaID) + } else { + // default replica ID is always -1 for clients + pe.putInt32(-1) + } + err := pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -59,10 +67,14 @@ func (r *OffsetRequest) encode(pe packetEncoder) error { func (r *OffsetRequest) decode(pd packetDecoder, version int16) error { r.Version = version - // Ignore replica ID - if _, err := pd.getInt32(); err != nil { + replicaID, err := pd.getInt32() + if err != nil { return err } + if replicaID >= 0 { + r.SetReplicaID(replicaID) + } + blockCount, err := pd.getArrayLength() if err != nil { return err @@ -113,6 +125,18 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion { } } +func (r *OffsetRequest) SetReplicaID(id int32) { + r.replicaID = id + r.isReplicaIDSet = true +} + +func (r *OffsetRequest) ReplicaID() int32 { + if r.isReplicaIDSet { + return r.replicaID + } + return -1 +} + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) { if r.blocks == nil { r.blocks = make(map[string]map[int32]*offsetRequestBlock)