Skip to content

Commit

Permalink
fix(producer): return errors for every message in retryBatch to avoid…
Browse files Browse the repository at this point in the history
… producer hang forever (#2378)

* fix(producer): return errors for every message in retryBatch to avoid the producer hang forever

* chore: add test

Co-authored-by: zcong1993 <zhangcong1992@gmail.com>
  • Loading branch information
cch123 and zcong1993 committed Dec 22, 2022
1 parent e5f5d5b commit 67d977b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
2 changes: 1 addition & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
produceSet.bufferCount += len(pSet.msgs)
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, kerr)
p.returnErrors(pSet.msgs, kerr)
return
}
msg.retries++
Expand Down
69 changes: 69 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,75 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
}
}

// test case for https://github.com/Shopify/sarama/pull/2378
func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) {
broker := NewMockBroker(t, 1)

metadataResponse := &MetadataResponse{
Version: 1,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)

initProducerIDResponse := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}

prodNotLeaderResponse := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)

handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) {
switch req.body.key() {
case 3:
return metadataResponse
case 22:
return initProducerIDResponse
case 0: // for msg, always return error to trigger retryBatch
return prodNotLeaderResponse
}
return nil
}

config := NewTestConfig()
config.Version = V0_11_0_0
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1
config.Producer.Retry.Max = 1 // set max retry to 1
config.Producer.RequiredAcks = WaitForAll
config.Producer.Return.Successes = true
config.Producer.Flush.Frequency = 50 * time.Millisecond
config.Producer.Retry.Backoff = 100 * time.Millisecond

broker.setHandler(handlerFailBeforeWrite)
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 3; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

go func() {
for i := 0; i < 7; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
time.Sleep(100 * time.Millisecond)
}
}()

// this will block until 5 minutes timeout before pr 2378 merge
expectResults(t, producer, 0, 10)

broker.Close()
closeProducer(t, producer)
}

func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
broker := NewMockBroker(t, 1)

Expand Down

0 comments on commit 67d977b

Please sign in to comment.