From bb63c8a6cdf32b627633669f738e84559969c45c Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 19 Aug 2025 11:17:10 -0400 Subject: [PATCH 1/4] improved error handling for sync produce --- kafka/producer.go | 8 +++++++ kafka/producer_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/kafka/producer.go b/kafka/producer.go index 0f85065f..0f261f59 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -247,6 +247,8 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { ctx = queuecontext.DetachedContext(ctx) } namespacePrefix := p.cfg.namespacePrefix() + + var errs []error for _, record := range rs { kgoRecord := &kgo.Record{ Headers: headers, @@ -259,6 +261,9 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { // kotel already marks spans as errors. No need to handle it here. if err != nil { topicName := strings.TrimPrefix(r.Topic, namespacePrefix) + + errs = append(errs, fmt.Errorf("failed to produce message: %v", err)) + logger := p.cfg.Logger if p.cfg.TopicLogFieldFunc != nil { logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)) @@ -279,6 +284,9 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { } if p.cfg.Sync { wg.Wait() + if len(errs) > 0 { + return errors.Join(errs...) + } } return nil } diff --git a/kafka/producer_test.go b/kafka/producer_test.go index f4988bc5..6fe5c53b 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -32,6 +32,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -222,6 +223,55 @@ func TestNewProducerBasic(t *testing.T) { test(t, false) } +func TestProduceSyncFailed(t *testing.T) { + test := func(t *testing.T, ns, topic string) { + t.Run(ns+topic, func(t *testing.T) { + correctNamespace := "name_space" + correctTopic := "default-topic" + namespacedTopic := correctNamespace + "-" + correctTopic + + _, brokers := newClusterWithTopics(t, int32(1), namespacedTopic) + producer := newProducer(t, ProducerConfig{ + CommonConfig: CommonConfig{ + Brokers: brokers, + Logger: zap.NewNop(), + Namespace: ns, + }, + Sync: true, + MaxBufferedRecords: 0, + }) + + batch := []apmqueue.Record{ + { + Topic: apmqueue.Topic(topic), + OrderingKey: []byte("key"), + Value: []byte("value"), + }, + } + + ctx := context.Background() + + // Cancel the context before calling Produce + if ns == correctNamespace && topic == correctTopic { + ctxCancelled, cancelProduce := context.WithCancel(ctx) + cancelProduce() + err := producer.Produce(ctxCancelled, batch...) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context canceled") + } + + if ns != correctNamespace || topic != correctTopic { + err := producer.Produce(ctx, batch...) + assert.Error(t, err) + assert.Contains(t, err.Error(), kerr.UnknownTopicOrPartition.Message) + } + }) + } + test(t, "name_space", "default-topic") + test(t, "incorrect-namespace", "default-topic") + test(t, "name_space", "incorrect-topic") +} + func testVerboseLogger(t testing.TB) *zap.Logger { t.Helper() if testing.Verbose() { From f1cf9cd7f5c9138131a8223889fb0b735675187a Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Wed, 20 Aug 2025 10:21:07 -0400 Subject: [PATCH 2/4] use table drive tests --- kafka/producer.go | 6 ++++- kafka/producer_test.go | 50 +++++++++++++++++++++++++++++------------- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/kafka/producer.go b/kafka/producer.go index 0f261f59..03a0dcd6 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -249,6 +249,8 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { namespacePrefix := p.cfg.namespacePrefix() var errs []error + var mu sync.Mutex + for _, record := range rs { kgoRecord := &kgo.Record{ Headers: headers, @@ -262,7 +264,9 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { if err != nil { topicName := strings.TrimPrefix(r.Topic, namespacePrefix) - errs = append(errs, fmt.Errorf("failed to produce message: %v", err)) + mu.Lock() + errs = append(errs, fmt.Errorf("failed to produce message: %w", err)) + mu.Unlock() logger := p.cfg.Logger if p.cfg.TopicLogFieldFunc != nil { diff --git a/kafka/producer_test.go b/kafka/producer_test.go index 6fe5c53b..570cfea7 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -224,18 +224,41 @@ func TestNewProducerBasic(t *testing.T) { } func TestProduceSyncFailed(t *testing.T) { - test := func(t *testing.T, ns, topic string) { - t.Run(ns+topic, func(t *testing.T) { - correctNamespace := "name_space" - correctTopic := "default-topic" - namespacedTopic := correctNamespace + "-" + correctTopic - - _, brokers := newClusterWithTopics(t, int32(1), namespacedTopic) + correctNamespace := "name_space" + correctTopic := "default-topic" + + testCases := []struct { + desc string + namespace string + topic string + }{ + { + desc: "correct namespace and topic", + namespace: correctNamespace, + topic: correctTopic, + }, + { + desc: "incorrect namespace", + namespace: "incorrect", + topic: correctTopic, + }, + { + desc: "incorrect topic", + namespace: correctNamespace, + topic: "incorrect", + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + _, brokers := newClusterWithTopics(t, + int32(1), + correctNamespace+"-"+correctTopic, + ) producer := newProducer(t, ProducerConfig{ CommonConfig: CommonConfig{ Brokers: brokers, Logger: zap.NewNop(), - Namespace: ns, + Namespace: tc.namespace, }, Sync: true, MaxBufferedRecords: 0, @@ -243,7 +266,7 @@ func TestProduceSyncFailed(t *testing.T) { batch := []apmqueue.Record{ { - Topic: apmqueue.Topic(topic), + Topic: apmqueue.Topic(tc.topic), OrderingKey: []byte("key"), Value: []byte("value"), }, @@ -252,7 +275,7 @@ func TestProduceSyncFailed(t *testing.T) { ctx := context.Background() // Cancel the context before calling Produce - if ns == correctNamespace && topic == correctTopic { + if tc.namespace == correctNamespace && tc.topic == correctTopic { ctxCancelled, cancelProduce := context.WithCancel(ctx) cancelProduce() err := producer.Produce(ctxCancelled, batch...) @@ -260,16 +283,13 @@ func TestProduceSyncFailed(t *testing.T) { assert.Contains(t, err.Error(), "context canceled") } - if ns != correctNamespace || topic != correctTopic { + if tc.namespace != correctNamespace || tc.topic != correctTopic { err := producer.Produce(ctx, batch...) assert.Error(t, err) assert.Contains(t, err.Error(), kerr.UnknownTopicOrPartition.Message) } }) } - test(t, "name_space", "default-topic") - test(t, "incorrect-namespace", "default-topic") - test(t, "name_space", "incorrect-topic") } func testVerboseLogger(t testing.TB) *zap.Logger { @@ -361,7 +381,7 @@ func TestProducerConcurrentClose(t *testing.T) { }) var wg sync.WaitGroup - for i := 0; i < 10; i++ { + for range 10 { wg.Add(1) go func() { defer wg.Done() From 1c515934ad40de5d538f691f6e20ee39ed27988e Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Wed, 20 Aug 2025 11:44:37 -0400 Subject: [PATCH 3/4] change naming --- kafka/metrics_test.go | 2 +- kafka/producer_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index cbb8e2ab..36e192c9 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -56,7 +56,7 @@ func TestProducerMetrics(t *testing.T) { ) // Fixes https://github.com/elastic/apm-queue/issues/464 - <-time.After(10 * time.Millisecond) + <-time.After(100 * time.Millisecond) // Close the producer so records are flushed. require.NoError(t, producer.Close()) diff --git a/kafka/producer_test.go b/kafka/producer_test.go index 570cfea7..48376358 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -238,14 +238,14 @@ func TestProduceSyncFailed(t *testing.T) { topic: correctTopic, }, { - desc: "incorrect namespace", - namespace: "incorrect", + desc: "unknown namespace", + namespace: "unknown", topic: correctTopic, }, { - desc: "incorrect topic", + desc: "unknown topic", namespace: correctNamespace, - topic: "incorrect", + topic: "unknown", }, } for _, tc := range testCases { From 450d0327091a6145072ea9a74588b1c6af01a434 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Wed, 20 Aug 2025 12:06:12 -0400 Subject: [PATCH 4/4] revert ci test change --- kafka/metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index 36e192c9..cbb8e2ab 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -56,7 +56,7 @@ func TestProducerMetrics(t *testing.T) { ) // Fixes https://github.com/elastic/apm-queue/issues/464 - <-time.After(100 * time.Millisecond) + <-time.After(10 * time.Millisecond) // Close the producer so records are flushed. require.NoError(t, producer.Close())