diff --git a/kafka/producer.go b/kafka/producer.go index 0f85065f..03a0dcd6 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -247,6 +247,10 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { ctx = queuecontext.DetachedContext(ctx) } namespacePrefix := p.cfg.namespacePrefix() + + var errs []error + var mu sync.Mutex + for _, record := range rs { kgoRecord := &kgo.Record{ Headers: headers, @@ -259,6 +263,11 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { // kotel already marks spans as errors. No need to handle it here. if err != nil { topicName := strings.TrimPrefix(r.Topic, namespacePrefix) + + mu.Lock() + errs = append(errs, fmt.Errorf("failed to produce message: %w", err)) + mu.Unlock() + logger := p.cfg.Logger if p.cfg.TopicLogFieldFunc != nil { logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)) @@ -279,6 +288,9 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { } if p.cfg.Sync { wg.Wait() + if len(errs) > 0 { + return errors.Join(errs...) + } } return nil } diff --git a/kafka/producer_test.go b/kafka/producer_test.go index f4988bc5..48376358 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -32,6 +32,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -222,6 +223,75 @@ func TestNewProducerBasic(t *testing.T) { test(t, false) } +func TestProduceSyncFailed(t *testing.T) { + correctNamespace := "name_space" + correctTopic := "default-topic" + + testCases := []struct { + desc string + namespace string + topic string + }{ + { + desc: "correct namespace and topic", + namespace: correctNamespace, + topic: correctTopic, + }, + { + desc: "unknown namespace", + namespace: "unknown", + topic: correctTopic, + }, + { + desc: "unknown topic", + namespace: correctNamespace, + topic: "unknown", + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + _, brokers := newClusterWithTopics(t, + int32(1), + correctNamespace+"-"+correctTopic, + ) + producer := newProducer(t, ProducerConfig{ + CommonConfig: CommonConfig{ + Brokers: brokers, + Logger: zap.NewNop(), + Namespace: tc.namespace, + }, + Sync: true, + MaxBufferedRecords: 0, + }) + + batch := []apmqueue.Record{ + { + Topic: apmqueue.Topic(tc.topic), + OrderingKey: []byte("key"), + Value: []byte("value"), + }, + } + + ctx := context.Background() + + // Cancel the context before calling Produce + if tc.namespace == correctNamespace && tc.topic == correctTopic { + ctxCancelled, cancelProduce := context.WithCancel(ctx) + cancelProduce() + err := producer.Produce(ctxCancelled, batch...) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context canceled") + } + + if tc.namespace != correctNamespace || tc.topic != correctTopic { + err := producer.Produce(ctx, batch...) + assert.Error(t, err) + assert.Contains(t, err.Error(), kerr.UnknownTopicOrPartition.Message) + } + }) + } +} + func testVerboseLogger(t testing.TB) *zap.Logger { t.Helper() if testing.Verbose() { @@ -311,7 +381,7 @@ func TestProducerConcurrentClose(t *testing.T) { }) var wg sync.WaitGroup - for i := 0; i < 10; i++ { + for range 10 { wg.Add(1) go func() { defer wg.Done()