Skip to content

Commit

Permalink
Merge pull request #1661 from zendesk/ktsanaktsidis/fix_seq_out_of_order
Browse files Browse the repository at this point in the history
Fix "broker received out of order sequence" when brokers die
  • Loading branch information
dnwe committed May 4, 2020
2 parents 1c3eb66 + ca14191 commit f7b64cf
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 18 deletions.
60 changes: 49 additions & 11 deletions async_producer.go
Expand Up @@ -60,13 +60,28 @@ const (
noProducerEpoch = -1
)

func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence
return sequence, t.producerEpoch
}

func (t *transactionManager) bumpEpoch() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.producerEpoch++
for k := range t.sequenceNumbers {
t.sequenceNumbers[k] = 0
}
}

func (t *transactionManager) getProducerID() (int64, int16) {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.producerID, t.producerEpoch
}

func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
Expand Down Expand Up @@ -208,6 +223,8 @@ type ProducerMessage struct {
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
producerEpoch int16
hasSequence bool
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand All @@ -234,6 +251,9 @@ func (m *ProducerMessage) byteSize(version int) int {
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
m.sequenceNumber = 0
m.producerEpoch = 0
m.hasSequence = false
}

// ProducerError is the type of error generated when the producer fails to deliver a message.
Expand Down Expand Up @@ -388,10 +408,6 @@ func (tp *topicProducer) dispatch() {
continue
}
}
// All messages being retried (sent or not) have already had their retry count updated
if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
}

handler := tp.handlers[msg.Partition]
if handler == nil {
Expand Down Expand Up @@ -570,6 +586,15 @@ func (pp *partitionProducer) dispatch() {
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}

// Now that we know we have a broker to actually try and send this message to, generate the sequence
// number for it.
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}

pp.brokerProducer.input <- msg
}
}
Expand Down Expand Up @@ -748,12 +773,21 @@ func (bp *brokerProducer) run() {
}

if bp.buffer.wouldOverflow(msg) {
if err := bp.waitForSpace(msg); err != nil {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}

if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
// The epoch was reset, need to roll the buffer over
Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
if err := bp.waitForSpace(msg, true); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
Expand Down Expand Up @@ -809,17 +843,15 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
return bp.currentRetries[msg.Topic][msg.Partition]
}

func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())

func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
for {
select {
case response := <-bp.responses:
bp.handleResponse(response)
// handling a response can change our state, so re-check some things
if reason := bp.needsRetry(msg); reason != nil {
return reason
} else if !bp.buffer.wouldOverflow(msg) {
} else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
return nil
}
case bp.output <- bp.buffer:
Expand Down Expand Up @@ -1030,6 +1062,12 @@ func (p *asyncProducer) shutdown() {
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
// will never see a message with this number, so we can never continue the sequence.
if msg.hasSequence {
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
p.txnmgr.bumpEpoch()
}
msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {
Expand Down
69 changes: 69 additions & 0 deletions async_producer_test.go
Expand Up @@ -1130,6 +1130,75 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
closeProducer(t, producer)
}

func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

metadataResponse := &MetadataResponse{
Version: 1,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}
broker.Returns(initProducerID)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Flush.Frequency = 10 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer closeProducer(t, producer)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
prodError := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
broker.Returns(prodError)
<-producer.Errors()

lastReqRes := broker.history[len(broker.history)-1]
lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("first sequence not zero")
}
if lastProduceBatch.ProducerEpoch != 1 {
t.Error("first epoch was not one")
}

// Now if we produce again, the epoch should have rolled over.
producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
broker.Returns(prodError)
<-producer.Errors()

lastReqRes = broker.history[len(broker.history)-1]
lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("second sequence not zero")
}
if lastProduceBatch.ProducerEpoch <= 1 {
t.Error("second epoch was not > 1")
}
}

// TestBrokerProducerShutdown ensures that a call to shutdown stops the
// brokerProducer run() loop and doesn't leak any goroutines
func TestBrokerProducerShutdown(t *testing.T) {
Expand Down
78 changes: 78 additions & 0 deletions functional_producer_test.go
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -96,6 +97,83 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
safeClose(t, producer)
}

func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewConfig()
config.Producer.Flush.Frequency = 250 * time.Millisecond
config.Producer.Idempotent = true
config.Producer.Timeout = 500 * time.Millisecond
config.Producer.Retry.Max = 1
config.Producer.Retry.Backoff = 500 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewSyncProducer(kafkaBrokers, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, producer)

// Successfully publish a few messages
for i := 0; i < 10; i++ {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("%d message", i)),
})
if err != nil {
t.Fatal(err)
}
}

// break the brokers.
for proxyName, proxy := range Proxies {
if !strings.Contains(proxyName, "kafka") {
continue
}
if err := proxy.Disable(); err != nil {
t.Fatal(err)
}
}

// This should fail hard now
for i := 10; i < 20; i++ {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("%d message", i)),
})
if err == nil {
t.Fatal(err)
}
}

// Now bring the proxy back up
for proxyName, proxy := range Proxies {
if !strings.Contains(proxyName, "kafka") {
continue
}
if err := proxy.Enable(); err != nil {
t.Fatal(err)
}
}

// We should be able to publish again (once everything calms down)
// (otherwise it times out)
for {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder("comeback message"),
})
if err == nil {
break
}
}
}

func testProducingMessages(t *testing.T, config *Config) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
Expand Down
24 changes: 17 additions & 7 deletions produce_set.go
Expand Up @@ -13,17 +13,22 @@ type partitionSet struct {
}

type produceSet struct {
parent *asyncProducer
msgs map[string]map[int32]*partitionSet
parent *asyncProducer
msgs map[string]map[int32]*partitionSet
producerID int64
producerEpoch int16

bufferBytes int
bufferCount int
}

func newProduceSet(parent *asyncProducer) *produceSet {
pid, epoch := parent.txnmgr.getProducerID()
return &produceSet{
msgs: make(map[string]map[int32]*partitionSet),
parent: parent,
msgs: make(map[string]map[int32]*partitionSet),
parent: parent,
producerID: pid,
producerEpoch: epoch,
}
}

Expand Down Expand Up @@ -65,8 +70,8 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
ProducerID: ps.parent.txnmgr.producerID,
ProducerEpoch: ps.parent.txnmgr.producerEpoch,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
Expand All @@ -78,12 +83,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
}
partitions[msg.Partition] = set
}
set.msgs = append(set.msgs, msg)

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
return errors.New("assertion failed: message out of sequence added to a batch")
}
}

// Past this point we can't return an error, because we've already added the message to the set.
set.msgs = append(set.msgs, msg)

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
rec := &Record{
Expand Down

0 comments on commit f7b64cf

Please sign in to comment.