From c637d25bd47c7caab1669ec330592678d8a06fe4 Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Tue, 26 Aug 2025 10:32:26 +0200 Subject: [PATCH 1/8] Add multiple attributes in topic attribute func --- kafka/common.go | 20 ++++++++++++-------- kafka/common_test.go | 14 +++++++------- kafka/consumer.go | 6 +++--- kafka/consumer_test.go | 4 ++-- kafka/logger_test.go | 4 ++-- kafka/manager.go | 24 ++++++++++-------------- kafka/manager_test.go | 6 +++--- kafka/metrics.go | 24 ++++++++++-------------- kafka/metrics_test.go | 12 ++++++------ kafka/producer.go | 2 +- kafka/topiccreator.go | 6 +++--- 11 files changed, 59 insertions(+), 63 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index b2301e6d..8a6dc447 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -45,7 +45,7 @@ import ( type SASLMechanism = sasl.Mechanism // TopicLogFieldFunc is a function that returns a zap.Field for a given topic. -type TopicLogFieldFunc func(topic string) zap.Field +type TopicLogFieldFunc func(topic string) []zap.Field // CommonConfig defines common configuration for Kafka consumers, producers, // and managers. @@ -204,8 +204,8 @@ func (cfg *CommonConfig) finalize() error { cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc) } if cfg.TopicAttributeFunc == nil { - cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue { - return attribute.KeyValue{} + cfg.TopicAttributeFunc = func(topic string) []attribute.KeyValue { + return nil } } @@ -348,14 +348,18 @@ func newAWSMSKIAMSASL() (sasl.Mechanism, error) { } func topicFieldFunc(f TopicLogFieldFunc) TopicLogFieldFunc { - return func(t string) zap.Field { + return func(t string) []zap.Field { if f == nil { - return zap.Skip() + return nil } - if field := f(t); field.Type > zapcore.UnknownType { - return field + fields := f(t) + var result []zap.Field + for _, field := range fields { + if field.Type > zapcore.UnknownType { + result = append(result, field) + } } - return zap.Skip() + return result } } diff --git a/kafka/common_test.go b/kafka/common_test.go index e80443bd..75280203 100644 --- a/kafka/common_test.go +++ b/kafka/common_test.go @@ -388,19 +388,19 @@ func newClusterWithTopics(t testing.TB, partitions int32, topics ...string) (*kg func TestTopicFieldFunc(t *testing.T) { t.Run("nil func", func(t *testing.T) { topic := topicFieldFunc(nil)("a") - assert.Equal(t, zap.Skip(), topic) + assert.Nil(t, topic) }) t.Run("empty field", func(t *testing.T) { - topic := topicFieldFunc(func(topic string) zap.Field { - return zap.Field{} + topic := topicFieldFunc(func(topic string) []zap.Field { + return nil })("b") - assert.Equal(t, zap.Skip(), topic) + assert.Nil(t, topic) }) t.Run("actual topic field", func(t *testing.T) { - topic := topicFieldFunc(func(topic string) zap.Field { - return zap.String("topic", topic) + topic := topicFieldFunc(func(topic string) []zap.Field { + return []zap.Field{zap.String("topic", topic)} })("c") - assert.Equal(t, zap.String("topic", "c"), topic) + assert.Equal(t, []zap.Field{zap.String("topic", "c")}, topic) }) } diff --git a/kafka/consumer.go b/kafka/consumer.go index 2d5c6a2a..176cc883 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -397,7 +397,7 @@ func (c *Consumer) fetch(ctx context.Context) error { // the fake fetch can have an empty topic so we need to // account for that if c.cfg.TopicLogFieldFunc != nil && topicName != "" { - logger = logger.With(c.cfg.TopicLogFieldFunc(topicName)) + logger = logger.With(c.cfg.TopicLogFieldFunc(topicName)...) } logger.Error( @@ -453,7 +453,7 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[ zap.Int32("partition", partition), ) if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(t)) + logger = logger.With(c.logFieldFn(t)...) } pc := newPartitionConsumer(c.ctx, client, c.processor, @@ -536,7 +536,7 @@ func (c *consumer) processFetch(fetches kgo.Fetches) { topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix) logger := c.logger if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(topicName)) + logger = logger.With(c.logFieldFn(topicName)...) } logger.Warn( "data loss: failed to send records to process after commit", diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index f1659190..f669a029 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -728,8 +728,8 @@ func TestConsumerTopicLogFieldFunc(t *testing.T) { CommonConfig: CommonConfig{ Brokers: addrs, Logger: zapTest(t), - TopicLogFieldFunc: func(topic string) zapcore.Field { - return zap.Field{} + TopicLogFieldFunc: func(topic string) []zapcore.Field { + return nil }, }, GroupID: t.Name(), diff --git a/kafka/logger_test.go b/kafka/logger_test.go index 6f70bb6b..691e3c34 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -64,7 +64,7 @@ func TestHookLogsFailedDial(t *testing.T) { } // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") }) + c, err := cfg.newClient(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }) require.NoError(t, err) <-time.After(time.Millisecond) @@ -88,7 +88,7 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") }) + c, err := cfg.newClient(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }) require.NoError(t, err) assert.Error(t, c.Ping(context.Background())) diff --git a/kafka/manager.go b/kafka/manager.go index ec26b188..27f738f1 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -125,7 +125,7 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er topic := strings.TrimPrefix(response.Topic, namespacePrefix) logger := m.cfg.Logger.With(zap.String("topic", topic)) if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) + logger = logger.With(m.cfg.TopicLogFieldFunc(topic)...) } if err := response.Err; err != nil { @@ -142,9 +142,8 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "failure"), attribute.String("topic", topic), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := m.cfg.TopicAttributeFunc(topic) + attrs = append(attrs, kvl...) m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -156,9 +155,8 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "success"), attribute.String("topic", topic), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := m.cfg.TopicAttributeFunc(topic) + attrs = append(attrs, kvl...) m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -257,7 +255,7 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m logger := m.cfg.Logger if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) + logger = logger.With(m.cfg.TopicLogFieldFunc(topic)...) } var matchesRegex bool @@ -298,9 +296,8 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", topic), attribute.Int("partition", int(partition)), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := m.cfg.TopicAttributeFunc(topic) + attrs = append(attrs, kvl...) o.ObserveInt64( consumerGroupLagMetric, lag.Lag, metric.WithAttributeSet(attribute.NewSet(attrs...)), @@ -313,9 +310,8 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", key.topic), attribute.String("client_id", key.clientID), } - if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := m.cfg.TopicAttributeFunc(key.topic) + attrs = append(attrs, kvl...) o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet( attribute.NewSet(attrs...), )) diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 57061728..ac093158 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -67,7 +67,7 @@ func TestManagerDeleteTopics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp commonConfig.MeterProvider = mt.MeterProvider - commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { return attribute.KeyValue{} } + commonConfig.TopicAttributeFunc = func(topic string) []attribute.KeyValue { return nil } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) @@ -182,8 +182,8 @@ func TestManagerMetrics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp commonConfig.MeterProvider = mp - commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { - return attribute.Bool("foo", true) + commonConfig.TopicAttributeFunc = func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.Bool("foo", true)} } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) diff --git a/kafka/metrics.go b/kafka/metrics.go index 09f4fb89..66fd3945 100644 --- a/kafka/metrics.go +++ b/kafka/metrics.go @@ -66,9 +66,9 @@ var ( // TopicAttributeFunc run on `kgo.HookProduceBatchWritten` and // `kgo.HookFetchBatchRead` for each topic/partition. It can be -// used include additionaly dimensions for `consumer.messages.fetched` +// used to include additional dimensions for `consumer.messages.fetched` // and `producer.messages.count` metrics. -type TopicAttributeFunc func(topic string) attribute.KeyValue +type TopicAttributeFunc func(topic string) []attribute.KeyValue type metricHooks struct { namespace string @@ -455,9 +455,8 @@ func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata, attribute.String("outcome", "success"), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := h.topicAttributeFunc(topic) + attrs = append(attrs, kvl...) if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } @@ -496,9 +495,8 @@ func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata, semconv.MessagingKafkaSourcePartition(int(partition)), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := h.topicAttributeFunc(topic) + attrs = append(attrs, kvl...) if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } @@ -538,9 +536,8 @@ func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) { semconv.MessagingKafkaDestinationPartition(int(r.Partition)), attribute.String("outcome", "failure"), ) - if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := h.topicAttributeFunc(r.Topic) + attrs = append(attrs, kvl...) if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } @@ -573,9 +570,8 @@ func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) { semconv.MessagingSourceName(strings.TrimPrefix(r.Topic, h.topicPrefix)), semconv.MessagingKafkaSourcePartition(int(r.Partition)), ) - if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + kvl := h.topicAttributeFunc(r.Topic) + attrs = append(attrs, kvl...) if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index cbb8e2ab..e13404e6 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -200,8 +200,8 @@ func TestProducerMetrics(t *testing.T) { test(context.Background(), t, producer, rdr, "unknown-topic", want) }) t.Run("Produced", func(t *testing.T) { - producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue { - return attribute.String("test", "test") + producer, rdr := setupTestProducer(t, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test", "test")} }) want := []metricdata.Metrics{ { @@ -315,8 +315,8 @@ func TestProducerMetrics(t *testing.T) { test(context.Background(), t, producer, rdr, "default-topic", want) }) t.Run("ProducedWithHeaders", func(t *testing.T) { - producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue { - return attribute.String("some key", "some value") + producer, rdr := setupTestProducer(t, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("some key", "some value")} }) want := []metricdata.Metrics{ { @@ -412,8 +412,8 @@ func TestConsumerMetrics(t *testing.T) { } return nil }) - tc := setupTestConsumer(t, proc, func(topic string) attribute.KeyValue { - return attribute.String("header", "included") + tc := setupTestConsumer(t, proc, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("header", "included")} }) ctx, cancel := context.WithTimeout(context.Background(), time.Second) diff --git a/kafka/producer.go b/kafka/producer.go index 03a0dcd6..4995b08b 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -270,7 +270,7 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { logger := p.cfg.Logger if p.cfg.TopicLogFieldFunc != nil { - logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)) + logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)...) } logger.Error("failed producing message", diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index d52cb5d9..349496f4 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -169,7 +169,7 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)...) } if err := response.Err; err != nil { if errors.Is(err, kerr.TopicAlreadyExists) { @@ -224,7 +224,7 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)...) } if errors.Is(response.Err, kerr.InvalidRequest) { @@ -267,7 +267,7 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi topicName := strings.TrimPrefix(response.Name, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)...) } if err := response.Err; err != nil { From cd09f085a73d7a6b503c344ed12c3c40482c8afb Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Wed, 27 Aug 2025 10:34:36 +0200 Subject: [PATCH 2/8] Add backward compatibility --- kafka/common.go | 78 +++++++++++++++++++++++++-------- kafka/common_test.go | 16 +++---- kafka/consumer.go | 31 ++++++++++--- kafka/consumer_test.go | 4 +- kafka/log_compacted_consumer.go | 5 ++- kafka/logger_test.go | 8 +++- kafka/manager.go | 44 ++++++++++++++----- kafka/manager_test.go | 6 +-- kafka/metrics.go | 56 +++++++++++++++++------ kafka/metrics_test.go | 75 +++++++++++++++++++++---------- kafka/producer.go | 10 ++++- kafka/topiccreator.go | 15 +++++-- 12 files changed, 257 insertions(+), 91 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index 8a6dc447..2cc36831 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -45,7 +45,11 @@ import ( type SASLMechanism = sasl.Mechanism // TopicLogFieldFunc is a function that returns a zap.Field for a given topic. -type TopicLogFieldFunc func(topic string) []zap.Field +// Deprecated: use TopicLogFieldsFunc instead. +type TopicLogFieldFunc func(topic string) zap.Field + +// TopicLogFieldsFunc is a function that returns a list of zap.Field for a given topic. +type TopicLogFieldsFunc func(topic string) []zap.Field // CommonConfig defines common configuration for Kafka consumers, producers, // and managers. @@ -150,16 +154,28 @@ type CommonConfig struct { // Defaults to the global one. MeterProvider metric.MeterProvider - // TopicAttributeFunc can be used to create custom dimensions from a Kafka + // TopicAttributeFunc can be used to create one custom dimension from a Kafka // topic for these metrics: // - producer.messages.count // - consumer.messages.fetched + // Deprecated: Use TopicMultipleAttributeFunc instead. TopicAttributeFunc TopicAttributeFunc - // TopicAttributeFunc can be used to create custom dimensions from a Kafka - // topic for log messages + // TopicMultipleAttributeFunc can be used to create multiple custom dimensions from a Kafka + // topic for these metrics: + // - producer.messages.count + // - consumer.messages.fetched + TopicMultipleAttributeFunc TopicMultipleAttributeFunc + + // TopicAttributeFunc can be used to create one custom dimension from a Kafka + // topic for log messages. + // Deprecated: use TopicLogFieldsFunc instead. TopicLogFieldFunc TopicLogFieldFunc + // TopicLogFieldsFunc can be used to create custom dimensions from a Kafka + // // topic for log messages. + TopicLogFieldsFunc TopicLogFieldsFunc + // MetadataMaxAge is the maximum age of metadata before it is refreshed. // The lower the value the more frequently new topics will be discovered. // If zero, the default value of 5 minutes is used. @@ -204,8 +220,8 @@ func (cfg *CommonConfig) finalize() error { cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc) } if cfg.TopicAttributeFunc == nil { - cfg.TopicAttributeFunc = func(topic string) []attribute.KeyValue { - return nil + cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue { + return attribute.KeyValue{} } } @@ -214,7 +230,6 @@ func (cfg *CommonConfig) finalize() error { // Merge the configs generated from env vars and/or files. func (cfg *CommonConfig) flatten(envCfg *envConfig, fileCfg *fileConfig) error { - // Config file was set with env vars. if cfg.ConfigFile == "" { cfg.ConfigFile = envCfg.configFile @@ -281,7 +296,38 @@ func (cfg *CommonConfig) meterProvider() metric.MeterProvider { return otel.GetMeterProvider() } +// newClient creates a new *kgo.Client. +// Deprecated: use newClientWithOpts instead. func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) { + return cfg.newClientWithOpts([]Opts{WithTopicAttributeFunc(topicAttributeFunc)}, additionalOpts...) +} + +type Opts func(opts *ClientOpts) + +type ClientOpts struct { + topicAttributeFunc TopicAttributeFunc + topicMultipleAttributeFunc TopicMultipleAttributeFunc +} + +// Deprecated: Use WithTopicMultipleAttributeFunc instead. +func WithTopicAttributeFunc(topicAttributeFunc TopicAttributeFunc) func(opts *ClientOpts) { + return func(opts *ClientOpts) { + opts.topicAttributeFunc = topicAttributeFunc + } +} + +func WithTopicMultipleAttributeFunc(topicMultipleAttributeFunc TopicMultipleAttributeFunc) func(opts *ClientOpts) { + return func(opts *ClientOpts) { + opts.topicMultipleAttributeFunc = topicMultipleAttributeFunc + } +} + +func (cfg *CommonConfig) newClientWithOpts(clientOpts []Opts, additionalOpts ...kgo.Opt) (*kgo.Client, error) { + clOpts := &ClientOpts{} + for _, opt := range clientOpts { + opt(clOpts) + } + opts := []kgo.Opt{ kgo.WithLogger(kzap.New(cfg.Logger.Named("kafka"))), kgo.SeedBrokers(cfg.Brokers...), @@ -304,8 +350,8 @@ func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additi } opts = append(opts, additionalOpts...) if !cfg.DisableTelemetry { - metricHooks, err := newKgoHooks(cfg.meterProvider(), - cfg.Namespace, cfg.namespacePrefix(), topicAttributeFunc, + metricHooks, err := newKgoHooks( + cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), clOpts.topicAttributeFunc, clOpts.topicMultipleAttributeFunc, ) if err != nil { return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err) @@ -348,18 +394,14 @@ func newAWSMSKIAMSASL() (sasl.Mechanism, error) { } func topicFieldFunc(f TopicLogFieldFunc) TopicLogFieldFunc { - return func(t string) []zap.Field { + return func(t string) zap.Field { if f == nil { - return nil + return zap.Skip() } - fields := f(t) - var result []zap.Field - for _, field := range fields { - if field.Type > zapcore.UnknownType { - result = append(result, field) - } + if field := f(t); field.Type > zapcore.UnknownType { + return field } - return result + return zap.Skip() } } diff --git a/kafka/common_test.go b/kafka/common_test.go index 75280203..cf858ab6 100644 --- a/kafka/common_test.go +++ b/kafka/common_test.go @@ -340,7 +340,7 @@ func TestCommonConfigFileHook(t *testing.T) { require.NoError(t, cfg.finalize()) assert.Equal(t, []string{"testing.invalid"}, cfg.Brokers) - client, err := cfg.newClient(nil) + client, err := cfg.newClientWithOpts(nil) require.NoError(t, err) defer client.Close() @@ -388,19 +388,19 @@ func newClusterWithTopics(t testing.TB, partitions int32, topics ...string) (*kg func TestTopicFieldFunc(t *testing.T) { t.Run("nil func", func(t *testing.T) { topic := topicFieldFunc(nil)("a") - assert.Nil(t, topic) + assert.Equal(t, zap.Skip(), topic) }) t.Run("empty field", func(t *testing.T) { - topic := topicFieldFunc(func(topic string) []zap.Field { - return nil + topic := topicFieldFunc(func(topic string) zap.Field { + return zap.Field{} })("b") - assert.Nil(t, topic) + assert.Equal(t, zap.Skip(), topic) }) t.Run("actual topic field", func(t *testing.T) { - topic := topicFieldFunc(func(topic string) []zap.Field { - return []zap.Field{zap.String("topic", topic)} + topic := topicFieldFunc(func(topic string) zap.Field { + return zap.String("topic", topic) })("c") - assert.Equal(t, []zap.Field{zap.String("topic", "c")}, topic) + assert.Equal(t, zap.String("topic", "c"), topic) }) } diff --git a/kafka/consumer.go b/kafka/consumer.go index 176cc883..1c043cc4 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -213,6 +213,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { consumer := &consumer{ topicPrefix: namespacePrefix, logFieldFn: cfg.TopicLogFieldFunc, + logFieldsFn: cfg.TopicLogFieldsFunc, assignments: make(map[topicPartition]*pc), processor: cfg.Processor, logger: cfg.Logger.Named("partition"), @@ -230,7 +231,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { kgo.ConsumeTopics(topics...), // If a rebalance happens while the client is polling, the consumed // records may belong to a partition which has been reassigned to a - // different consumer int he group. To avoid this scenario, Polls will + // different consumer in the group. To avoid this scenario, Polls will // block rebalances of partitions which would be lost, and the consumer // MUST manually call `AllowRebalance`. kgo.BlockRebalanceOnPoll(), @@ -270,7 +271,10 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { opts = append(opts, kgo.BrokerMaxReadBytes(cfg.BrokerMaxReadBytes)) } - client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) + client, err := cfg.newClientWithOpts( + []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicMultipleAttributeFunc)}, + opts..., + ) if err != nil { return nil, fmt.Errorf("kafka: failed creating kafka consumer: %w", err) } @@ -396,8 +400,13 @@ func (c *Consumer) fetch(ctx context.Context) error { // franz-go can inject fake fetches in case of errors. // the fake fetch can have an empty topic so we need to // account for that - if c.cfg.TopicLogFieldFunc != nil && topicName != "" { - logger = logger.With(c.cfg.TopicLogFieldFunc(topicName)...) + if topicName != "" { + if c.cfg.TopicLogFieldFunc != nil { + logger = logger.With(c.cfg.TopicLogFieldFunc(topicName)) + } + if c.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.cfg.TopicLogFieldsFunc(topicName)...) + } } logger.Error( @@ -429,7 +438,11 @@ type consumer struct { processor apmqueue.Processor logger *zap.Logger delivery apmqueue.DeliveryType + + // Deprecated: use logFieldsFn instead. logFieldFn TopicLogFieldFunc + logFieldsFn TopicLogFieldsFunc + // ctx contains the graceful cancellation context that is passed to the // partition consumers. ctx context.Context @@ -453,7 +466,10 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[ zap.Int32("partition", partition), ) if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(t)...) + logger = logger.With(c.logFieldFn(t)) + } + if c.logFieldsFn != nil { + logger = logger.With(c.logFieldsFn(t)...) } pc := newPartitionConsumer(c.ctx, client, c.processor, @@ -536,7 +552,10 @@ func (c *consumer) processFetch(fetches kgo.Fetches) { topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix) logger := c.logger if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(topicName)...) + logger = logger.With(c.logFieldFn(topicName)) + } + if c.logFieldsFn != nil { + logger = logger.With(c.logFieldsFn(topicName)...) } logger.Warn( "data loss: failed to send records to process after commit", diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index f669a029..f1659190 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -728,8 +728,8 @@ func TestConsumerTopicLogFieldFunc(t *testing.T) { CommonConfig: CommonConfig{ Brokers: addrs, Logger: zapTest(t), - TopicLogFieldFunc: func(topic string) []zapcore.Field { - return nil + TopicLogFieldFunc: func(topic string) zapcore.Field { + return zap.Field{} }, }, GroupID: t.Name(), diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index eac0a35d..4014ca19 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -108,7 +108,10 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, opts = append(opts, kgo.FetchMinBytes(cfg.MinFetchSize)) } - client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) + client, err := cfg.newClientWithOpts( + []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicMultipleAttributeFunc)}, + opts..., + ) if err != nil { return nil, err } diff --git a/kafka/logger_test.go b/kafka/logger_test.go index 691e3c34..94651478 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -64,7 +64,9 @@ func TestHookLogsFailedDial(t *testing.T) { } // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClient(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }) + c, err := cfg.newClientWithOpts([]Opts{ + WithTopicAttributeFunc(func(string) attribute.KeyValue { return attribute.String("k", "v") }), + }) require.NoError(t, err) <-time.After(time.Millisecond) @@ -88,7 +90,9 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClient(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }) + c, err := cfg.newClientWithOpts([]Opts{ + WithTopicAttributeFunc(func(string) attribute.KeyValue { return attribute.String("k", "v") }), + }) require.NoError(t, err) assert.Error(t, c.Ping(context.Background())) diff --git a/kafka/manager.go b/kafka/manager.go index 27f738f1..993200a3 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -69,7 +69,7 @@ func NewManager(cfg ManagerConfig) (*Manager, error) { if err := cfg.finalize(); err != nil { return nil, fmt.Errorf("kafka: invalid manager config: %w", err) } - client, err := cfg.newClient(nil) + client, err := cfg.newClientWithOpts(nil) if err != nil { return nil, fmt.Errorf("kafka: failed creating kafka client: %w", err) } @@ -125,7 +125,10 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er topic := strings.TrimPrefix(response.Topic, namespacePrefix) logger := m.cfg.Logger.With(zap.String("topic", topic)) if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)...) + logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) + } + if m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(m.cfg.TopicLogFieldsFunc(topic)...) } if err := response.Err; err != nil { @@ -142,8 +145,12 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "failure"), attribute.String("topic", topic), } - kvl := m.cfg.TopicAttributeFunc(topic) - attrs = append(attrs, kvl...) + if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if m.cfg.TopicMultipleAttributeFunc != nil { + attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(topic)...) + } m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -155,8 +162,12 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "success"), attribute.String("topic", topic), } - kvl := m.cfg.TopicAttributeFunc(topic) - attrs = append(attrs, kvl...) + if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if m.cfg.TopicMultipleAttributeFunc != nil { + attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(topic)...) + } m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -255,7 +266,10 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m logger := m.cfg.Logger if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)...) + logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) + } + if m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(m.cfg.TopicLogFieldsFunc(topic)...) } var matchesRegex bool @@ -296,8 +310,12 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", topic), attribute.Int("partition", int(partition)), } - kvl := m.cfg.TopicAttributeFunc(topic) - attrs = append(attrs, kvl...) + if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if m.cfg.TopicMultipleAttributeFunc != nil { + attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(topic)...) + } o.ObserveInt64( consumerGroupLagMetric, lag.Lag, metric.WithAttributeSet(attribute.NewSet(attrs...)), @@ -310,8 +328,12 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", key.topic), attribute.String("client_id", key.clientID), } - kvl := m.cfg.TopicAttributeFunc(key.topic) - attrs = append(attrs, kvl...) + if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if m.cfg.TopicMultipleAttributeFunc != nil { + attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(key.topic)...) + } o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet( attribute.NewSet(attrs...), )) diff --git a/kafka/manager_test.go b/kafka/manager_test.go index ac093158..57061728 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -67,7 +67,7 @@ func TestManagerDeleteTopics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp commonConfig.MeterProvider = mt.MeterProvider - commonConfig.TopicAttributeFunc = func(topic string) []attribute.KeyValue { return nil } + commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { return attribute.KeyValue{} } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) @@ -182,8 +182,8 @@ func TestManagerMetrics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp commonConfig.MeterProvider = mp - commonConfig.TopicAttributeFunc = func(topic string) []attribute.KeyValue { - return []attribute.KeyValue{attribute.Bool("foo", true)} + commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { + return attribute.Bool("foo", true) } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) diff --git a/kafka/metrics.go b/kafka/metrics.go index 66fd3945..5442e6f5 100644 --- a/kafka/metrics.go +++ b/kafka/metrics.go @@ -66,9 +66,17 @@ var ( // TopicAttributeFunc run on `kgo.HookProduceBatchWritten` and // `kgo.HookFetchBatchRead` for each topic/partition. It can be -// used to include additional dimensions for `consumer.messages.fetched` +// used to include one additional dimension for `consumer.messages.fetched` // and `producer.messages.count` metrics. -type TopicAttributeFunc func(topic string) []attribute.KeyValue +// +// Deprecated: Please use TopicMultipleAttributeFunc instead. +type TopicAttributeFunc func(topic string) attribute.KeyValue + +// TopicMultipleAttributeFunc run on `kgo.HookProduceBatchWritten` and +// `kgo.HookFetchBatchRead` for each topic/partition. It can be +// used to include aditional dimensions for `consumer.messages.fetched` +// and `producer.messages.count` metrics. +type TopicMultipleAttributeFunc func(topic string) []attribute.KeyValue type metricHooks struct { namespace string @@ -103,11 +111,16 @@ type metricHooks struct { messageDelay metric.Float64Histogram throttlingDuration metric.Float64Histogram - topicAttributeFunc TopicAttributeFunc + // Deprecated: use topicMultipleAttributeFunc instead. + topicAttributeFunc TopicAttributeFunc + topicMultipleAttributeFunc TopicMultipleAttributeFunc } -func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string, +func newKgoHooks( + mp metric.MeterProvider, + namespace, topicPrefix string, topicAttributeFunc TopicAttributeFunc, + topicMultipleAttributeFunc TopicMultipleAttributeFunc, ) (*metricHooks, error) { m := mp.Meter(instrumentName) @@ -333,7 +346,8 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string, messageDelay: messageDelayHistogram, throttlingDuration: throttlingDurationHistogram, - topicAttributeFunc: topicAttributeFunc, + topicAttributeFunc: topicAttributeFunc, + topicMultipleAttributeFunc: topicMultipleAttributeFunc, }, nil } @@ -455,8 +469,12 @@ func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata, attribute.String("outcome", "success"), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - kvl := h.topicAttributeFunc(topic) - attrs = append(attrs, kvl...) + if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if h.topicMultipleAttributeFunc != nil { + attrs = append(attrs, h.topicMultipleAttributeFunc(topic)...) + } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } @@ -495,8 +513,12 @@ func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata, semconv.MessagingKafkaSourcePartition(int(partition)), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - kvl := h.topicAttributeFunc(topic) - attrs = append(attrs, kvl...) + if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if h.topicMultipleAttributeFunc != nil { + attrs = append(attrs, h.topicMultipleAttributeFunc(topic)...) + } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } @@ -536,8 +558,12 @@ func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) { semconv.MessagingKafkaDestinationPartition(int(r.Partition)), attribute.String("outcome", "failure"), ) - kvl := h.topicAttributeFunc(r.Topic) - attrs = append(attrs, kvl...) + if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if h.topicMultipleAttributeFunc != nil { + attrs = append(attrs, h.topicMultipleAttributeFunc(r.Topic)...) + } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } @@ -570,8 +596,12 @@ func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) { semconv.MessagingSourceName(strings.TrimPrefix(r.Topic, h.topicPrefix)), semconv.MessagingKafkaSourcePartition(int(r.Partition)), ) - kvl := h.topicAttributeFunc(r.Topic) - attrs = append(attrs, kvl...) + if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { + attrs = append(attrs, kv) + } + if h.topicMultipleAttributeFunc != nil { + attrs = append(attrs, h.topicMultipleAttributeFunc(r.Topic)...) + } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) } diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index e13404e6..0e202f85 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -82,7 +82,7 @@ func TestProducerMetrics(t *testing.T) { } } t.Run("DeadlineExceeded", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{ { Name: "producer.messages.count", @@ -113,7 +113,7 @@ func TestProducerMetrics(t *testing.T) { test(ctx, t, producer, rdr, "default-topic", want) }) t.Run("ContextCanceled", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{ { Name: "producer.messages.count", @@ -143,7 +143,7 @@ func TestProducerMetrics(t *testing.T) { test(ctx, t, producer, rdr, "default-topic", want) }) t.Run("Unknown error reason", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{{ Name: "producer.messages.count", Description: "The number of messages produced", @@ -171,7 +171,7 @@ func TestProducerMetrics(t *testing.T) { test(context.Background(), t, producer, rdr, "default-topic", want) }) t.Run("unknown topic", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{{ Name: "producer.messages.count", Description: "The number of messages produced", @@ -200,8 +200,10 @@ func TestProducerMetrics(t *testing.T) { test(context.Background(), t, producer, rdr, "unknown-topic", want) }) t.Run("Produced", func(t *testing.T) { - producer, rdr := setupTestProducer(t, func(topic string) []attribute.KeyValue { - return []attribute.KeyValue{attribute.String("test", "test")} + producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue { + return attribute.String("test-1", "test-1") + }, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")} }) want := []metricdata.Metrics{ { @@ -221,7 +223,9 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), semconv.MessagingDestinationName("default-topic"), semconv.MessagingKafkaDestinationPartition(0), - attribute.String("test", "test"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), attribute.String("compression.codec", "none"), ), }, @@ -245,7 +249,9 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), semconv.MessagingDestinationName("default-topic"), semconv.MessagingKafkaDestinationPartition(0), - attribute.String("test", "test"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), attribute.String("compression.codec", "none"), ), }, @@ -269,7 +275,9 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), semconv.MessagingDestinationName("default-topic"), semconv.MessagingKafkaDestinationPartition(0), - attribute.String("test", "test"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), attribute.String("compression.codec", "none"), ), }, @@ -315,8 +323,10 @@ func TestProducerMetrics(t *testing.T) { test(context.Background(), t, producer, rdr, "default-topic", want) }) t.Run("ProducedWithHeaders", func(t *testing.T) { - producer, rdr := setupTestProducer(t, func(topic string) []attribute.KeyValue { - return []attribute.KeyValue{attribute.String("some key", "some value")} + producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue { + return attribute.String("some key", "some value") + }, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")} }) want := []metricdata.Metrics{ { @@ -338,6 +348,8 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingKafkaDestinationPartition(0), attribute.String("some key", "some value"), attribute.String("compression.codec", "snappy"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), ), }, }, @@ -362,6 +374,8 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingKafkaDestinationPartition(0), attribute.String("some key", "some value"), attribute.String("compression.codec", "snappy"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), ), }, }, @@ -386,6 +400,8 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingKafkaDestinationPartition(0), attribute.String("some key", "some value"), attribute.String("compression.codec", "snappy"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), ), }, }, @@ -412,8 +428,10 @@ func TestConsumerMetrics(t *testing.T) { } return nil }) - tc := setupTestConsumer(t, proc, func(topic string) []attribute.KeyValue { - return []attribute.KeyValue{attribute.String("header", "included")} + tc := setupTestConsumer(t, proc, func(topic string) attribute.KeyValue { + return attribute.String("header", "included") + }, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")} }) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -427,6 +445,8 @@ func TestConsumerMetrics(t *testing.T) { Headers: []kgo.RecordHeader{ {Key: "header", Value: []byte("included")}, {Key: "traceparent", Value: []byte("excluded")}, + {Key: "test-1", Value: []byte("test-1")}, + {Key: "test-2", Value: []byte("test-2")}, }, }) } @@ -455,6 +475,8 @@ func TestConsumerMetrics(t *testing.T) { Attributes: attribute.NewSet( attribute.String("compression.codec", "none"), attribute.String("header", "included"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), semconv.MessagingKafkaSourcePartition(0), semconv.MessagingSourceName(t.Name()), semconv.MessagingSystem("kafka"), @@ -479,6 +501,8 @@ func TestConsumerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), attribute.String("namespace", "name_space"), attribute.String("topic", "name_space-TestConsumerMetrics"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, @@ -549,7 +573,7 @@ func filterMetrics(t testing.TB, sm []metricdata.ScopeMetrics) []metricdata.Metr return []metricdata.Metrics{} } -func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc) (*Producer, sdkmetric.Reader) { +func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc, tmafunc TopicMultipleAttributeFunc) (*Producer, sdkmetric.Reader) { t.Helper() rdr := sdkmetric.NewManualReader() @@ -560,12 +584,13 @@ func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc) (*Producer, sdkm }) producer := newProducer(t, ProducerConfig{ CommonConfig: CommonConfig{ - Brokers: brokers, - Logger: zap.NewNop(), - Namespace: "name_space", - TracerProvider: noop.NewTracerProvider(), - MeterProvider: mp, - TopicAttributeFunc: tafunc, + Brokers: brokers, + Logger: zap.NewNop(), + Namespace: "name_space", + TracerProvider: noop.NewTracerProvider(), + MeterProvider: mp, + TopicAttributeFunc: tafunc, + TopicMultipleAttributeFunc: tmafunc, }, Sync: true, }) @@ -578,7 +603,12 @@ type testMetricConsumer struct { reader sdkmetric.Reader } -func setupTestConsumer(t testing.TB, p apmqueue.Processor, tafunc TopicAttributeFunc) (mc testMetricConsumer) { +func setupTestConsumer( + t testing.TB, + p apmqueue.Processor, + tafunc TopicAttributeFunc, + tmafunc TopicMultipleAttributeFunc, +) (mc testMetricConsumer) { t.Helper() mc.reader = sdkmetric.NewManualReader() @@ -593,7 +623,8 @@ func setupTestConsumer(t testing.TB, p apmqueue.Processor, tafunc TopicAttribute MeterProvider: sdkmetric.NewMeterProvider( sdkmetric.WithReader(mc.reader), ), - TopicAttributeFunc: tafunc, + TopicAttributeFunc: tafunc, + TopicMultipleAttributeFunc: tmafunc, }, } mc.client, cfg.Brokers = newClusterWithTopics(t, 1, "name_space-"+t.Name()) diff --git a/kafka/producer.go b/kafka/producer.go index 4995b08b..8f0e62da 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -188,7 +188,10 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) { if cfg.AllowAutoTopicCreation { opts = append(opts, kgo.AllowAutoTopicCreation()) } - client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) + client, err := cfg.newClientWithOpts( + []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicMultipleAttributeFunc)}, + opts..., + ) if err != nil { return nil, fmt.Errorf("kafka: failed creating producer: %w", err) } @@ -270,7 +273,10 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { logger := p.cfg.Logger if p.cfg.TopicLogFieldFunc != nil { - logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)...) + logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)) + } + if p.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(p.cfg.TopicLogFieldsFunc(topicName)...) } logger.Error("failed producing message", diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 349496f4..308fb91e 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -169,7 +169,10 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)...) + logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + } + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } if err := response.Err; err != nil { if errors.Is(err, kerr.TopicAlreadyExists) { @@ -224,7 +227,10 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)...) + logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + } + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } if errors.Is(response.Err, kerr.InvalidRequest) { @@ -267,7 +273,10 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi topicName := strings.TrimPrefix(response.Name, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)...) + logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + } + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } if err := response.Err; err != nil { From 53657b3b2132cf223abbf40090d1bc699e91e6d3 Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Wed, 27 Aug 2025 10:36:59 +0200 Subject: [PATCH 3/8] remove unused function --- kafka/common.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index 2cc36831..3b5c991d 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -296,12 +296,6 @@ func (cfg *CommonConfig) meterProvider() metric.MeterProvider { return otel.GetMeterProvider() } -// newClient creates a new *kgo.Client. -// Deprecated: use newClientWithOpts instead. -func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) { - return cfg.newClientWithOpts([]Opts{WithTopicAttributeFunc(topicAttributeFunc)}, additionalOpts...) -} - type Opts func(opts *ClientOpts) type ClientOpts struct { From 800cb59e464ce15aa3ab4bc3260430e0946d74eb Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Wed, 27 Aug 2025 17:11:39 +0200 Subject: [PATCH 4/8] address comments --- kafka/common.go | 61 +++++++++++++------ kafka/common_test.go | 103 +++++++++++++++++++++++++++++++- kafka/consumer.go | 14 +---- kafka/consumer_test.go | 2 - kafka/log_compacted_consumer.go | 2 +- kafka/manager.go | 16 ++--- kafka/metrics.go | 44 +++++--------- kafka/metrics_test.go | 22 +++---- kafka/producer.go | 2 +- 9 files changed, 182 insertions(+), 84 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index 3b5c991d..7c1dd2ae 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -158,14 +158,14 @@ type CommonConfig struct { // topic for these metrics: // - producer.messages.count // - consumer.messages.fetched - // Deprecated: Use TopicMultipleAttributeFunc instead. + // Deprecated: Use TopicAttributesFunc instead. TopicAttributeFunc TopicAttributeFunc - // TopicMultipleAttributeFunc can be used to create multiple custom dimensions from a Kafka + // TopicAttributesFunc can be used to create multiple custom dimensions from a Kafka // topic for these metrics: // - producer.messages.count // - consumer.messages.fetched - TopicMultipleAttributeFunc TopicMultipleAttributeFunc + TopicAttributesFunc TopicAttributesFunc // TopicAttributeFunc can be used to create one custom dimension from a Kafka // topic for log messages. @@ -214,17 +214,6 @@ func (cfg *CommonConfig) finalize() error { errs = append(errs, err) } - // Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with - // an unknown type (like `zap.Field{}`). - if cfg.TopicLogFieldFunc != nil { - cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc) - } - if cfg.TopicAttributeFunc == nil { - cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue { - return attribute.KeyValue{} - } - } - return errors.Join(errs...) } @@ -300,7 +289,7 @@ type Opts func(opts *ClientOpts) type ClientOpts struct { topicAttributeFunc TopicAttributeFunc - topicMultipleAttributeFunc TopicMultipleAttributeFunc + topicMultipleAttributeFunc TopicAttributesFunc } // Deprecated: Use WithTopicMultipleAttributeFunc instead. @@ -310,7 +299,7 @@ func WithTopicAttributeFunc(topicAttributeFunc TopicAttributeFunc) func(opts *Cl } } -func WithTopicMultipleAttributeFunc(topicMultipleAttributeFunc TopicMultipleAttributeFunc) func(opts *ClientOpts) { +func WithTopicMultipleAttributeFunc(topicMultipleAttributeFunc TopicAttributesFunc) func(opts *ClientOpts) { return func(opts *ClientOpts) { opts.topicMultipleAttributeFunc = topicMultipleAttributeFunc } @@ -344,8 +333,9 @@ func (cfg *CommonConfig) newClientWithOpts(clientOpts []Opts, additionalOpts ... } opts = append(opts, additionalOpts...) if !cfg.DisableTelemetry { + multipleAttributesFn := mergeTopicAttributeFunctions(clOpts.topicAttributeFunc, clOpts.topicMultipleAttributeFunc) metricHooks, err := newKgoHooks( - cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), clOpts.topicAttributeFunc, clOpts.topicMultipleAttributeFunc, + cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), multipleAttributesFn, ) if err != nil { return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err) @@ -399,6 +389,43 @@ func topicFieldFunc(f TopicLogFieldFunc) TopicLogFieldFunc { } } +// mergeTopicLogFieldsFunctions merges the fields from TopicLogFieldFunc +// and TopicLogFieldsFunc, and returns a new TopicLogFieldsFunc with all +// log fields. +func mergeTopicLogFieldsFunctions(single TopicLogFieldFunc, multiple TopicLogFieldsFunc) TopicLogFieldsFunc { + if single == nil { + return multiple + } + if multiple == nil { + return func(topic string) []zap.Field { + // Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with + // an unknown type (like `zap.Field{}`). + fn := topicFieldFunc(single) + return []zap.Field{fn(topic)} + } + } + return func(topic string) []zap.Field { + return append(multiple(topic), single(topic)) + } +} + +// mergeTopicAttributeFunctions merges the attributes from TopicAttributeFunc +// and TopicAttributesFunc, and returns a new TopicAttributesFunc with all +// attributes. +func mergeTopicAttributeFunctions(single TopicAttributeFunc, multiple TopicAttributesFunc) TopicAttributesFunc { + if single == nil { + return multiple + } + if multiple == nil { + return func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{single(topic)} + } + } + return func(topic string) []attribute.KeyValue { + return append(multiple(topic), single(topic)) + } +} + // newCertReloadingDialer returns a dialer that reloads the CA cert when the // file mod time changes. func newCertReloadingDialer(caPath, certPath, keyPath string, diff --git a/kafka/common_test.go b/kafka/common_test.go index cf858ab6..87de89ce 100644 --- a/kafka/common_test.go +++ b/kafka/common_test.go @@ -45,6 +45,7 @@ import ( "github.com/twmb/franz-go/pkg/kfake" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl" + "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" ) @@ -59,8 +60,6 @@ func TestCommonConfig(t *testing.T) { t.Helper() err := in.finalize() require.NoError(t, err) - in.TopicAttributeFunc = nil - in.TopicLogFieldFunc = nil in.hooks = nil assert.Equal(t, expected, in) } @@ -774,6 +773,106 @@ func TestCertificateHotReloadErrors(t *testing.T) { }) } +func TestMergeTopicFunctions(t *testing.T) { + t.Run("log fields", func(t *testing.T) { + tests := []struct { + name string + single TopicLogFieldFunc + multiple TopicLogFieldsFunc + want []zap.Field + }{ + { + name: "both nil functions", + }, + { + name: "nil single log field", + multiple: func(_ string) []zap.Field { + return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")} + }, + want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")}, + }, + { + name: "nil multiple log field", + single: func(_ string) zap.Field { + return zap.String("test-1", "test-1") + }, + want: []zap.Field{zap.String("test-1", "test-1")}, + }, + { + name: "both functions exist", + multiple: func(_ string) []zap.Field { + return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")} + }, + single: func(_ string) zap.Field { + return zap.String("test-3", "test-3") + }, + want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2"), zap.String("test-3", "test-3")}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fn := mergeTopicLogFieldsFunctions(tt.single, tt.multiple) + if tt.want == nil { + require.Nil(t, fn) + return + } + fields := fn("test") + require.Equal(t, tt.want, fields) + }) + } + }) + t.Run("attributes", func(t *testing.T) { + tests := []struct { + name string + single TopicAttributeFunc + multiple TopicAttributesFunc + want []attribute.KeyValue + }{ + { + name: "both nil functions", + }, + { + name: "nil single attribute", + multiple: func(_ string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")} + }, + want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")}, + }, + { + name: "nil multiple log field", + single: func(_ string) attribute.KeyValue { + return attribute.String("test-1", "test-1") + }, + want: []attribute.KeyValue{attribute.String("test-1", "test-1")}, + }, + { + name: "both functions exist", + multiple: func(_ string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")} + }, + single: func(_ string) attribute.KeyValue { + return attribute.String("test-3", "test-3") + }, + want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fn := mergeTopicAttributeFunctions(tt.single, tt.multiple) + if tt.want == nil { + require.Nil(t, fn) + return + } + fields := fn("test") + require.Equal(t, tt.want, fields) + }) + } + + }) +} + // Helper functions for certificate generation func generateCA(t testing.TB) (*rsa.PrivateKey, *x509.Certificate, []byte) { caKey, err := rsa.GenerateKey(rand.Reader, 2048) diff --git a/kafka/consumer.go b/kafka/consumer.go index 1c043cc4..48fd839d 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -212,8 +212,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { namespacePrefix := cfg.namespacePrefix() consumer := &consumer{ topicPrefix: namespacePrefix, - logFieldFn: cfg.TopicLogFieldFunc, - logFieldsFn: cfg.TopicLogFieldsFunc, + logFieldsFn: mergeTopicLogFieldsFunctions(cfg.TopicLogFieldFunc, cfg.TopicLogFieldsFunc), assignments: make(map[topicPartition]*pc), processor: cfg.Processor, logger: cfg.Logger.Named("partition"), @@ -272,7 +271,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { } client, err := cfg.newClientWithOpts( - []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicMultipleAttributeFunc)}, + []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { @@ -438,9 +437,6 @@ type consumer struct { processor apmqueue.Processor logger *zap.Logger delivery apmqueue.DeliveryType - - // Deprecated: use logFieldsFn instead. - logFieldFn TopicLogFieldFunc logFieldsFn TopicLogFieldsFunc // ctx contains the graceful cancellation context that is passed to the @@ -465,9 +461,6 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[ zap.String("topic", t), zap.Int32("partition", partition), ) - if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(t)) - } if c.logFieldsFn != nil { logger = logger.With(c.logFieldsFn(t)...) } @@ -551,9 +544,6 @@ func (c *consumer) processFetch(fetches kgo.Fetches) { if c.delivery == apmqueue.AtMostOnceDeliveryType { topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix) logger := c.logger - if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(topicName)) - } if c.logFieldsFn != nil { logger = logger.With(c.logFieldsFn(topicName)...) } diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index f1659190..e090101a 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -907,8 +907,6 @@ func assertNotNilOptions(t testing.TB, cfg *ConsumerConfig) { cfg.Processor = nil assert.NotNil(t, cfg.Logger) cfg.Logger = nil - assert.NotNil(t, cfg.TopicAttributeFunc) - cfg.TopicAttributeFunc = nil } func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer { diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index 4014ca19..b213303b 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -109,7 +109,7 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, } client, err := cfg.newClientWithOpts( - []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicMultipleAttributeFunc)}, + []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { diff --git a/kafka/manager.go b/kafka/manager.go index 993200a3..dc695262 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -148,8 +148,8 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { attrs = append(attrs, kv) } - if m.cfg.TopicMultipleAttributeFunc != nil { - attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(topic)...) + if m.cfg.TopicAttributesFunc != nil { + attrs = append(attrs, m.cfg.TopicAttributesFunc(topic)...) } m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), @@ -165,8 +165,8 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { attrs = append(attrs, kv) } - if m.cfg.TopicMultipleAttributeFunc != nil { - attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(topic)...) + if m.cfg.TopicAttributesFunc != nil { + attrs = append(attrs, m.cfg.TopicAttributesFunc(topic)...) } m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), @@ -313,8 +313,8 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { attrs = append(attrs, kv) } - if m.cfg.TopicMultipleAttributeFunc != nil { - attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(topic)...) + if m.cfg.TopicAttributesFunc != nil { + attrs = append(attrs, m.cfg.TopicAttributesFunc(topic)...) } o.ObserveInt64( consumerGroupLagMetric, lag.Lag, @@ -331,8 +331,8 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) { attrs = append(attrs, kv) } - if m.cfg.TopicMultipleAttributeFunc != nil { - attrs = append(attrs, m.cfg.TopicMultipleAttributeFunc(key.topic)...) + if m.cfg.TopicAttributesFunc != nil { + attrs = append(attrs, m.cfg.TopicAttributesFunc(key.topic)...) } o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet( attribute.NewSet(attrs...), diff --git a/kafka/metrics.go b/kafka/metrics.go index 5442e6f5..d75bc1e7 100644 --- a/kafka/metrics.go +++ b/kafka/metrics.go @@ -69,14 +69,14 @@ var ( // used to include one additional dimension for `consumer.messages.fetched` // and `producer.messages.count` metrics. // -// Deprecated: Please use TopicMultipleAttributeFunc instead. +// Deprecated: Please use TopicAttributesFunc instead. type TopicAttributeFunc func(topic string) attribute.KeyValue -// TopicMultipleAttributeFunc run on `kgo.HookProduceBatchWritten` and +// TopicAttributesFunc run on `kgo.HookProduceBatchWritten` and // `kgo.HookFetchBatchRead` for each topic/partition. It can be // used to include aditional dimensions for `consumer.messages.fetched` // and `producer.messages.count` metrics. -type TopicMultipleAttributeFunc func(topic string) []attribute.KeyValue +type TopicAttributesFunc func(topic string) []attribute.KeyValue type metricHooks struct { namespace string @@ -111,16 +111,13 @@ type metricHooks struct { messageDelay metric.Float64Histogram throttlingDuration metric.Float64Histogram - // Deprecated: use topicMultipleAttributeFunc instead. - topicAttributeFunc TopicAttributeFunc - topicMultipleAttributeFunc TopicMultipleAttributeFunc + topicAttributesFunc TopicAttributesFunc } func newKgoHooks( mp metric.MeterProvider, namespace, topicPrefix string, - topicAttributeFunc TopicAttributeFunc, - topicMultipleAttributeFunc TopicMultipleAttributeFunc, + topicMultipleAttributeFunc TopicAttributesFunc, ) (*metricHooks, error) { m := mp.Meter(instrumentName) @@ -346,8 +343,7 @@ func newKgoHooks( messageDelay: messageDelayHistogram, throttlingDuration: throttlingDurationHistogram, - topicAttributeFunc: topicAttributeFunc, - topicMultipleAttributeFunc: topicMultipleAttributeFunc, + topicAttributesFunc: topicMultipleAttributeFunc, }, nil } @@ -469,11 +465,8 @@ func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata, attribute.String("outcome", "success"), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } - if h.topicMultipleAttributeFunc != nil { - attrs = append(attrs, h.topicMultipleAttributeFunc(topic)...) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) @@ -513,11 +506,8 @@ func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata, semconv.MessagingKafkaSourcePartition(int(partition)), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } - if h.topicMultipleAttributeFunc != nil { - attrs = append(attrs, h.topicMultipleAttributeFunc(topic)...) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) @@ -558,11 +548,8 @@ func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) { semconv.MessagingKafkaDestinationPartition(int(r.Partition)), attribute.String("outcome", "failure"), ) - if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } - if h.topicMultipleAttributeFunc != nil { - attrs = append(attrs, h.topicMultipleAttributeFunc(r.Topic)...) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(r.Topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) @@ -596,11 +583,8 @@ func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) { semconv.MessagingSourceName(strings.TrimPrefix(r.Topic, h.topicPrefix)), semconv.MessagingKafkaSourcePartition(int(r.Partition)), ) - if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } - if h.topicMultipleAttributeFunc != nil { - attrs = append(attrs, h.topicMultipleAttributeFunc(r.Topic)...) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(r.Topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index 0e202f85..76ce10a4 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -573,7 +573,7 @@ func filterMetrics(t testing.TB, sm []metricdata.ScopeMetrics) []metricdata.Metr return []metricdata.Metrics{} } -func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc, tmafunc TopicMultipleAttributeFunc) (*Producer, sdkmetric.Reader) { +func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc, tmafunc TopicAttributesFunc) (*Producer, sdkmetric.Reader) { t.Helper() rdr := sdkmetric.NewManualReader() @@ -584,13 +584,13 @@ func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc, tmafunc TopicMul }) producer := newProducer(t, ProducerConfig{ CommonConfig: CommonConfig{ - Brokers: brokers, - Logger: zap.NewNop(), - Namespace: "name_space", - TracerProvider: noop.NewTracerProvider(), - MeterProvider: mp, - TopicAttributeFunc: tafunc, - TopicMultipleAttributeFunc: tmafunc, + Brokers: brokers, + Logger: zap.NewNop(), + Namespace: "name_space", + TracerProvider: noop.NewTracerProvider(), + MeterProvider: mp, + TopicAttributeFunc: tafunc, + TopicAttributesFunc: tmafunc, }, Sync: true, }) @@ -607,7 +607,7 @@ func setupTestConsumer( t testing.TB, p apmqueue.Processor, tafunc TopicAttributeFunc, - tmafunc TopicMultipleAttributeFunc, + tmafunc TopicAttributesFunc, ) (mc testMetricConsumer) { t.Helper() @@ -623,8 +623,8 @@ func setupTestConsumer( MeterProvider: sdkmetric.NewMeterProvider( sdkmetric.WithReader(mc.reader), ), - TopicAttributeFunc: tafunc, - TopicMultipleAttributeFunc: tmafunc, + TopicAttributeFunc: tafunc, + TopicAttributesFunc: tmafunc, }, } mc.client, cfg.Brokers = newClusterWithTopics(t, 1, "name_space-"+t.Name()) diff --git a/kafka/producer.go b/kafka/producer.go index 8f0e62da..e0cd1a70 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -189,7 +189,7 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) { opts = append(opts, kgo.AllowAutoTopicCreation()) } client, err := cfg.newClientWithOpts( - []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicMultipleAttributeFunc)}, + []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { From 743993304b913ce0b8c8989c139ba27082c6bb9f Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Wed, 27 Aug 2025 17:35:12 +0200 Subject: [PATCH 5/8] use merge on finalize config --- kafka/common.go | 36 +++++++++++++++++---------------- kafka/consumer.go | 4 ++-- kafka/log_compacted_consumer.go | 2 +- kafka/logger_test.go | 8 ++++++-- kafka/producer.go | 2 +- 5 files changed, 29 insertions(+), 23 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index 7c1dd2ae..514ff2a9 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -214,6 +214,9 @@ func (cfg *CommonConfig) finalize() error { errs = append(errs, err) } + cfg.TopicLogFieldsFunc = mergeTopicLogFieldsFunctions(cfg.TopicLogFieldFunc, cfg.TopicLogFieldsFunc) + cfg.TopicAttributesFunc = mergeTopicAttributeFunctions(cfg.TopicAttributeFunc, cfg.TopicAttributesFunc) + return errors.Join(errs...) } @@ -288,20 +291,12 @@ func (cfg *CommonConfig) meterProvider() metric.MeterProvider { type Opts func(opts *ClientOpts) type ClientOpts struct { - topicAttributeFunc TopicAttributeFunc - topicMultipleAttributeFunc TopicAttributesFunc -} - -// Deprecated: Use WithTopicMultipleAttributeFunc instead. -func WithTopicAttributeFunc(topicAttributeFunc TopicAttributeFunc) func(opts *ClientOpts) { - return func(opts *ClientOpts) { - opts.topicAttributeFunc = topicAttributeFunc - } + topicAttributesFunc TopicAttributesFunc } -func WithTopicMultipleAttributeFunc(topicMultipleAttributeFunc TopicAttributesFunc) func(opts *ClientOpts) { +func WithTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(opts *ClientOpts) { return func(opts *ClientOpts) { - opts.topicMultipleAttributeFunc = topicMultipleAttributeFunc + opts.topicAttributesFunc = topicAttributesFunc } } @@ -333,9 +328,8 @@ func (cfg *CommonConfig) newClientWithOpts(clientOpts []Opts, additionalOpts ... } opts = append(opts, additionalOpts...) if !cfg.DisableTelemetry { - multipleAttributesFn := mergeTopicAttributeFunctions(clOpts.topicAttributeFunc, clOpts.topicMultipleAttributeFunc) metricHooks, err := newKgoHooks( - cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), multipleAttributesFn, + cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), clOpts.topicAttributesFunc, ) if err != nil { return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err) @@ -398,14 +392,18 @@ func mergeTopicLogFieldsFunctions(single TopicLogFieldFunc, multiple TopicLogFie } if multiple == nil { return func(topic string) []zap.Field { - // Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with - // an unknown type (like `zap.Field{}`). fn := topicFieldFunc(single) return []zap.Field{fn(topic)} } } return func(topic string) []zap.Field { - return append(multiple(topic), single(topic)) + fields := multiple(topic) + for i := range fields { + if fields[i].Type <= zapcore.UnknownType { + fields[i] = zap.Skip() + } + } + return append(fields, single(topic)) } } @@ -418,7 +416,11 @@ func mergeTopicAttributeFunctions(single TopicAttributeFunc, multiple TopicAttri } if multiple == nil { return func(topic string) []attribute.KeyValue { - return []attribute.KeyValue{single(topic)} + v := single(topic) + if v == (attribute.KeyValue{}) { + return nil + } + return []attribute.KeyValue{v} } } return func(topic string) []attribute.KeyValue { diff --git a/kafka/consumer.go b/kafka/consumer.go index 48fd839d..279fd960 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -212,7 +212,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { namespacePrefix := cfg.namespacePrefix() consumer := &consumer{ topicPrefix: namespacePrefix, - logFieldsFn: mergeTopicLogFieldsFunctions(cfg.TopicLogFieldFunc, cfg.TopicLogFieldsFunc), + logFieldsFn: cfg.TopicLogFieldsFunc, assignments: make(map[topicPartition]*pc), processor: cfg.Processor, logger: cfg.Logger.Named("partition"), @@ -271,7 +271,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { } client, err := cfg.newClientWithOpts( - []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []Opts{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index b213303b..316db1c3 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -109,7 +109,7 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, } client, err := cfg.newClientWithOpts( - []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []Opts{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { diff --git a/kafka/logger_test.go b/kafka/logger_test.go index 94651478..fece1b6c 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -65,7 +65,9 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. c, err := cfg.newClientWithOpts([]Opts{ - WithTopicAttributeFunc(func(string) attribute.KeyValue { return attribute.String("k", "v") }), + WithTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("k", "v")} + }), }) require.NoError(t, err) @@ -91,7 +93,9 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. c, err := cfg.newClientWithOpts([]Opts{ - WithTopicAttributeFunc(func(string) attribute.KeyValue { return attribute.String("k", "v") }), + WithTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("k", "v")} + }), }) require.NoError(t, err) assert.Error(t, c.Ping(context.Background())) diff --git a/kafka/producer.go b/kafka/producer.go index e0cd1a70..35271d1f 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -189,7 +189,7 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) { opts = append(opts, kgo.AllowAutoTopicCreation()) } client, err := cfg.newClientWithOpts( - []Opts{WithTopicAttributeFunc(cfg.TopicAttributeFunc), WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []Opts{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { From e5a09d7dd7864f7ea0de74790f16fcd898fc8196 Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Wed, 27 Aug 2025 17:39:19 +0200 Subject: [PATCH 6/8] make opts non exportable --- kafka/common.go | 16 ++++++++-------- kafka/consumer.go | 2 +- kafka/log_compacted_consumer.go | 2 +- kafka/logger_test.go | 4 ++-- kafka/producer.go | 2 +- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index 514ff2a9..3b51c31f 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -288,21 +288,21 @@ func (cfg *CommonConfig) meterProvider() metric.MeterProvider { return otel.GetMeterProvider() } -type Opts func(opts *ClientOpts) +type clientOptsFn func(opts *clientOpts) -type ClientOpts struct { +type clientOpts struct { topicAttributesFunc TopicAttributesFunc } -func WithTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(opts *ClientOpts) { - return func(opts *ClientOpts) { - opts.topicAttributesFunc = topicAttributesFunc +func WithTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(clOpts *clientOpts) { + return func(clOpts *clientOpts) { + clOpts.topicAttributesFunc = topicAttributesFunc } } -func (cfg *CommonConfig) newClientWithOpts(clientOpts []Opts, additionalOpts ...kgo.Opt) (*kgo.Client, error) { - clOpts := &ClientOpts{} - for _, opt := range clientOpts { +func (cfg *CommonConfig) newClientWithOpts(clientOptsFn []clientOptsFn, additionalOpts ...kgo.Opt) (*kgo.Client, error) { + clOpts := &clientOpts{} + for _, opt := range clientOptsFn { opt(clOpts) } diff --git a/kafka/consumer.go b/kafka/consumer.go index 279fd960..ce50f8bb 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -271,7 +271,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { } client, err := cfg.newClientWithOpts( - []Opts{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []clientOptsFn{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index 316db1c3..0de64225 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -109,7 +109,7 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, } client, err := cfg.newClientWithOpts( - []Opts{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []clientOptsFn{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { diff --git a/kafka/logger_test.go b/kafka/logger_test.go index fece1b6c..1e5b6eb5 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -64,7 +64,7 @@ func TestHookLogsFailedDial(t *testing.T) { } // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClientWithOpts([]Opts{ + c, err := cfg.newClientWithOpts([]clientOptsFn{ WithTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }), @@ -92,7 +92,7 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClientWithOpts([]Opts{ + c, err := cfg.newClientWithOpts([]clientOptsFn{ WithTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }), diff --git a/kafka/producer.go b/kafka/producer.go index 35271d1f..4581a69d 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -189,7 +189,7 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) { opts = append(opts, kgo.AllowAutoTopicCreation()) } client, err := cfg.newClientWithOpts( - []Opts{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []clientOptsFn{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { From b6e2881a9d5c3735ddaee716f67024faf8dd7def Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Fri, 29 Aug 2025 18:30:44 +0200 Subject: [PATCH 7/8] address comment --- kafka/common.go | 2 +- kafka/consumer.go | 2 +- kafka/log_compacted_consumer.go | 2 +- kafka/logger_test.go | 4 ++-- kafka/producer.go | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index 3b51c31f..ac5da6c0 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -294,7 +294,7 @@ type clientOpts struct { topicAttributesFunc TopicAttributesFunc } -func WithTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(clOpts *clientOpts) { +func withTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(clOpts *clientOpts) { return func(clOpts *clientOpts) { clOpts.topicAttributesFunc = topicAttributesFunc } diff --git a/kafka/consumer.go b/kafka/consumer.go index ce50f8bb..e86cb333 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -271,7 +271,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { } client, err := cfg.newClientWithOpts( - []clientOptsFn{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index 0de64225..4c1fa20c 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -109,7 +109,7 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, } client, err := cfg.newClientWithOpts( - []clientOptsFn{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { diff --git a/kafka/logger_test.go b/kafka/logger_test.go index 1e5b6eb5..a9b77842 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -65,7 +65,7 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. c, err := cfg.newClientWithOpts([]clientOptsFn{ - WithTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { + withTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }), }) @@ -93,7 +93,7 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. c, err := cfg.newClientWithOpts([]clientOptsFn{ - WithTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { + withTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("k", "v")} }), }) diff --git a/kafka/producer.go b/kafka/producer.go index 4581a69d..60a8d619 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -189,7 +189,7 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) { opts = append(opts, kgo.AllowAutoTopicCreation()) } client, err := cfg.newClientWithOpts( - []clientOptsFn{WithTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + []clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, opts..., ) if err != nil { From 1d906bb7a8f51d6585fd40e4a19b7c4e3072cad2 Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Wed, 3 Sep 2025 09:54:13 +0200 Subject: [PATCH 8/8] remove all usages from deprecated functions --- kafka/consumer.go | 3 --- kafka/manager.go | 31 +++++++------------------------ kafka/manager_test.go | 42 ++++++++++++++++++++++++++++++++++++------ kafka/producer.go | 3 --- kafka/topiccreator.go | 9 --------- 5 files changed, 43 insertions(+), 45 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index e86cb333..23f44088 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -400,9 +400,6 @@ func (c *Consumer) fetch(ctx context.Context) error { // the fake fetch can have an empty topic so we need to // account for that if topicName != "" { - if c.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.cfg.TopicLogFieldFunc(topicName)) - } if c.cfg.TopicLogFieldsFunc != nil { logger = logger.With(c.cfg.TopicLogFieldsFunc(topicName)...) } diff --git a/kafka/manager.go b/kafka/manager.go index dc695262..1f64cf9e 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -124,12 +124,13 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er for _, response := range responses.Sorted() { topic := strings.TrimPrefix(response.Topic, namespacePrefix) logger := m.cfg.Logger.With(zap.String("topic", topic)) - if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) - } if m.cfg.TopicLogFieldsFunc != nil { logger = logger.With(m.cfg.TopicLogFieldsFunc(topic)...) } + var commonAttrs []attribute.KeyValue + if m.cfg.TopicAttributesFunc != nil { + commonAttrs = m.cfg.TopicAttributesFunc(topic) + } if err := response.Err; err != nil { if errors.Is(err, kerr.UnknownTopicOrPartition) { @@ -145,12 +146,8 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "failure"), attribute.String("topic", topic), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } - if m.cfg.TopicAttributesFunc != nil { - attrs = append(attrs, m.cfg.TopicAttributesFunc(topic)...) - } + attrs = append(attrs, commonAttrs...) + m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -162,12 +159,7 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "success"), attribute.String("topic", topic), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } - if m.cfg.TopicAttributesFunc != nil { - attrs = append(attrs, m.cfg.TopicAttributesFunc(topic)...) - } + attrs = append(attrs, commonAttrs...) m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -265,9 +257,6 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m topic = topic[len(namespacePrefix):] logger := m.cfg.Logger - if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) - } if m.cfg.TopicLogFieldsFunc != nil { logger = logger.With(m.cfg.TopicLogFieldsFunc(topic)...) } @@ -310,9 +299,6 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", topic), attribute.Int("partition", int(partition)), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } if m.cfg.TopicAttributesFunc != nil { attrs = append(attrs, m.cfg.TopicAttributesFunc(topic)...) } @@ -328,9 +314,6 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", key.topic), attribute.String("client_id", key.clientID), } - if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } if m.cfg.TopicAttributesFunc != nil { attrs = append(attrs, m.cfg.TopicAttributesFunc(key.topic)...) } diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 57061728..e6b24842 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -67,7 +67,18 @@ func TestManagerDeleteTopics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp commonConfig.MeterProvider = mt.MeterProvider - commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { return attribute.KeyValue{} } + commonConfig.TopicLogFieldFunc = func(topic string) zap.Field { + return zap.String("from-1", "singular-topic-log-func") + } + commonConfig.TopicLogFieldsFunc = func(topic string) []zap.Field { + return []zap.Field{zap.String("from-2", "multiple-topic-log-func")} + } + commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { + return attribute.String("from-1", "singular-topic-attribute-func") + } + commonConfig.TopicAttributesFunc = func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("from-2", "multiple-topic-attribute-func")} + } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) @@ -117,6 +128,8 @@ func TestManagerDeleteTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.String("topic", "topic1"), + zap.String("from-2", "multiple-topic-log-func"), + zap.String("from-1", "singular-topic-log-func"), }, }, { Entry: zapcore.Entry{ @@ -127,6 +140,8 @@ func TestManagerDeleteTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.String("topic", "topic3"), + zap.String("from-2", "multiple-topic-log-func"), + zap.String("from-1", "singular-topic-log-func"), }, }}, matchingLogs.AllUntimed()) @@ -160,11 +175,13 @@ func TestManagerDeleteTopics(t *testing.T) { // Ensure only 1 topic was deleted, which also matches the number of spans. assert.Empty(t, cmp.Diff(metrictest.Int64Metrics{ {Name: "topics.deleted.count"}: { - {K: "topic", V: "topic2"}: 1, - {K: "messaging.system", V: "kafka"}: 2, - {K: "outcome", V: "failure"}: 1, - {K: "outcome", V: "success"}: 1, - {K: "topic", V: "topic3"}: 1, + {K: "topic", V: "topic2"}: 1, + {K: "messaging.system", V: "kafka"}: 2, + {K: "outcome", V: "failure"}: 1, + {K: "outcome", V: "success"}: 1, + {K: "topic", V: "topic3"}: 1, + {K: "from-1", V: "singular-topic-attribute-func"}: 2, + {K: "from-2", V: "multiple-topic-attribute-func"}: 2, }, }, gotMetrics)) } @@ -185,6 +202,9 @@ func TestManagerMetrics(t *testing.T) { commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { return attribute.Bool("foo", true) } + commonConfig.TopicAttributesFunc = func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.Bool("bar", true)} + } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) @@ -466,6 +486,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic1"), attribute.Int("partition", 1), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 0, // end offset = 1, committed = 1 }, { @@ -474,6 +495,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic1"), attribute.Int("partition", 2), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, // end offset = 2, committed = 1 }, { @@ -482,6 +504,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic2"), attribute.Int("partition", 3), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 2, // end offset = 3, committed = 1 }, { @@ -490,6 +513,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic3"), attribute.Int("partition", 4), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 0, // end offset = 4, nothing committed }, { @@ -498,6 +522,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "mytopic"), attribute.Int("partition", 1), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, // end offset = 1, nothing committed }}, @@ -510,6 +535,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer1"), attribute.String("topic", "topic1"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -518,6 +544,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer2"), attribute.String("topic", "topic2"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -526,6 +553,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer2"), attribute.String("topic", "topic1"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -534,6 +562,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer3"), attribute.String("topic", "topic3"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -542,6 +571,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer3"), attribute.String("topic", "mytopic"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }}, diff --git a/kafka/producer.go b/kafka/producer.go index 60a8d619..f11a79a4 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -272,9 +272,6 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { mu.Unlock() logger := p.cfg.Logger - if p.cfg.TopicLogFieldFunc != nil { - logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)) - } if p.cfg.TopicLogFieldsFunc != nil { logger = logger.With(p.cfg.TopicLogFieldsFunc(topicName)...) } diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 308fb91e..50a1ffce 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -168,9 +168,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) - } if c.m.cfg.TopicLogFieldsFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } @@ -226,9 +223,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for _, response := range updateResp.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) - } if c.m.cfg.TopicLogFieldsFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } @@ -272,9 +266,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for _, response := range alterResp { topicName := strings.TrimPrefix(response.Name, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) - } if c.m.cfg.TopicLogFieldsFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) }