Skip to content

Commit

Permalink
Merge pull request #1644 from zendesk/ktsanaktsidis/fix_session_id_th…
Browse files Browse the repository at this point in the history
…rashing

Fix brokers continually allocating new Session IDs
  • Loading branch information
bai committed Apr 9, 2020
2 parents 2d3d6cd + b9b3d09 commit 6159078
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
8 changes: 8 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,14 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request.Version = 4
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}
if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
request.Version = 7
// We do not currently implement KIP-227 FetchSessions. Setting the id to 0
// and the epoch to -1 tells the broker not to generate as session ID we're going
// to just ignore anyway.
request.SessionID = 0
request.SessionEpoch = -1
}
if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
request.Version = 10
}
Expand Down
51 changes: 50 additions & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {

cfg := NewConfig()
cfg.Consumer.Return.Errors = true
cfg.Version = V1_1_0_0
cfg.Version = V0_11_0_0

broker0 := NewMockBroker(t, 0)

Expand Down Expand Up @@ -569,6 +569,55 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
broker0.Close()
}

func TestConsumeMessageWithSessionIDs(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 7}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)

cfg := NewConfig()
cfg.Version = V1_1_0_0

broker0 := NewMockBroker(t, 0)
fetchResponse2 := &FetchResponse{}
fetchResponse2.Version = 7
fetchResponse2.AddError("my_topic", 0, ErrNoError)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

assertMessageOffset(t, <-consumer.Messages(), 1)
assertMessageOffset(t, <-consumer.Messages(), 2)

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()

fetchReq := broker0.History()[3].Request.(*FetchRequest)
if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
t.Error("Expected session ID to be zero & Epoch to be -1")
}
}

// It is fine if offsets of fetched messages are not sequential (although
// strictly increasing!).
func TestConsumerNonSequentialOffsets(t *testing.T) {
Expand Down

0 comments on commit 6159078

Please sign in to comment.