Skip to content

Commit

Permalink
Add a functional test to cover idempotent production
Browse files Browse the repository at this point in the history
  • Loading branch information
KJTsanaktsidis committed Apr 16, 2020
1 parent 9df3038 commit ca14191
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -96,6 +97,83 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
safeClose(t, producer)
}

func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewConfig()
config.Producer.Flush.Frequency = 250 * time.Millisecond
config.Producer.Idempotent = true
config.Producer.Timeout = 500 * time.Millisecond
config.Producer.Retry.Max = 1
config.Producer.Retry.Backoff = 500 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewSyncProducer(kafkaBrokers, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, producer)

// Successfully publish a few messages
for i := 0; i < 10; i++ {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("%d message", i)),
})
if err != nil {
t.Fatal(err)
}
}

// break the brokers.
for proxyName, proxy := range Proxies {
if !strings.Contains(proxyName, "kafka") {
continue
}
if err := proxy.Disable(); err != nil {
t.Fatal(err)
}
}

// This should fail hard now
for i := 10; i < 20; i++ {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("%d message", i)),
})
if err == nil {
t.Fatal(err)
}
}

// Now bring the proxy back up
for proxyName, proxy := range Proxies {
if !strings.Contains(proxyName, "kafka") {
continue
}
if err := proxy.Enable(); err != nil {
t.Fatal(err)
}
}

// We should be able to publish again (once everything calms down)
// (otherwise it times out)
for {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder("comeback message"),
})
if err == nil {
break
}
}
}

func testProducingMessages(t *testing.T, config *Config) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
Expand Down

0 comments on commit ca14191

Please sign in to comment.