diff --git a/kafka/common.go b/kafka/common.go index b2301e6d..ac5da6c0 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -45,8 +45,12 @@ import ( type SASLMechanism = sasl.Mechanism // TopicLogFieldFunc is a function that returns a zap.Field for a given topic. +// Deprecated: use TopicLogFieldsFunc instead. type TopicLogFieldFunc func(topic string) zap.Field +// TopicLogFieldsFunc is a function that returns a list of zap.Field for a given topic. +type TopicLogFieldsFunc func(topic string) []zap.Field + // CommonConfig defines common configuration for Kafka consumers, producers, // and managers. type CommonConfig struct { @@ -150,16 +154,28 @@ type CommonConfig struct { // Defaults to the global one. MeterProvider metric.MeterProvider - // TopicAttributeFunc can be used to create custom dimensions from a Kafka + // TopicAttributeFunc can be used to create one custom dimension from a Kafka // topic for these metrics: // - producer.messages.count // - consumer.messages.fetched + // Deprecated: Use TopicAttributesFunc instead. TopicAttributeFunc TopicAttributeFunc - // TopicAttributeFunc can be used to create custom dimensions from a Kafka - // topic for log messages + // TopicAttributesFunc can be used to create multiple custom dimensions from a Kafka + // topic for these metrics: + // - producer.messages.count + // - consumer.messages.fetched + TopicAttributesFunc TopicAttributesFunc + + // TopicAttributeFunc can be used to create one custom dimension from a Kafka + // topic for log messages. + // Deprecated: use TopicLogFieldsFunc instead. TopicLogFieldFunc TopicLogFieldFunc + // TopicLogFieldsFunc can be used to create custom dimensions from a Kafka + // // topic for log messages. + TopicLogFieldsFunc TopicLogFieldsFunc + // MetadataMaxAge is the maximum age of metadata before it is refreshed. // The lower the value the more frequently new topics will be discovered. // If zero, the default value of 5 minutes is used. @@ -198,23 +214,14 @@ func (cfg *CommonConfig) finalize() error { errs = append(errs, err) } - // Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with - // an unknown type (like `zap.Field{}`). - if cfg.TopicLogFieldFunc != nil { - cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc) - } - if cfg.TopicAttributeFunc == nil { - cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue { - return attribute.KeyValue{} - } - } + cfg.TopicLogFieldsFunc = mergeTopicLogFieldsFunctions(cfg.TopicLogFieldFunc, cfg.TopicLogFieldsFunc) + cfg.TopicAttributesFunc = mergeTopicAttributeFunctions(cfg.TopicAttributeFunc, cfg.TopicAttributesFunc) return errors.Join(errs...) } // Merge the configs generated from env vars and/or files. func (cfg *CommonConfig) flatten(envCfg *envConfig, fileCfg *fileConfig) error { - // Config file was set with env vars. if cfg.ConfigFile == "" { cfg.ConfigFile = envCfg.configFile @@ -281,7 +288,24 @@ func (cfg *CommonConfig) meterProvider() metric.MeterProvider { return otel.GetMeterProvider() } -func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) { +type clientOptsFn func(opts *clientOpts) + +type clientOpts struct { + topicAttributesFunc TopicAttributesFunc +} + +func withTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(clOpts *clientOpts) { + return func(clOpts *clientOpts) { + clOpts.topicAttributesFunc = topicAttributesFunc + } +} + +func (cfg *CommonConfig) newClientWithOpts(clientOptsFn []clientOptsFn, additionalOpts ...kgo.Opt) (*kgo.Client, error) { + clOpts := &clientOpts{} + for _, opt := range clientOptsFn { + opt(clOpts) + } + opts := []kgo.Opt{ kgo.WithLogger(kzap.New(cfg.Logger.Named("kafka"))), kgo.SeedBrokers(cfg.Brokers...), @@ -304,8 +328,8 @@ func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additi } opts = append(opts, additionalOpts...) if !cfg.DisableTelemetry { - metricHooks, err := newKgoHooks(cfg.meterProvider(), - cfg.Namespace, cfg.namespacePrefix(), topicAttributeFunc, + metricHooks, err := newKgoHooks( + cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), clOpts.topicAttributesFunc, ) if err != nil { return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err) @@ -359,6 +383,51 @@ func topicFieldFunc(f TopicLogFieldFunc) TopicLogFieldFunc { } } +// mergeTopicLogFieldsFunctions merges the fields from TopicLogFieldFunc +// and TopicLogFieldsFunc, and returns a new TopicLogFieldsFunc with all +// log fields. +func mergeTopicLogFieldsFunctions(single TopicLogFieldFunc, multiple TopicLogFieldsFunc) TopicLogFieldsFunc { + if single == nil { + return multiple + } + if multiple == nil { + return func(topic string) []zap.Field { + fn := topicFieldFunc(single) + return []zap.Field{fn(topic)} + } + } + return func(topic string) []zap.Field { + fields := multiple(topic) + for i := range fields { + if fields[i].Type <= zapcore.UnknownType { + fields[i] = zap.Skip() + } + } + return append(fields, single(topic)) + } +} + +// mergeTopicAttributeFunctions merges the attributes from TopicAttributeFunc +// and TopicAttributesFunc, and returns a new TopicAttributesFunc with all +// attributes. +func mergeTopicAttributeFunctions(single TopicAttributeFunc, multiple TopicAttributesFunc) TopicAttributesFunc { + if single == nil { + return multiple + } + if multiple == nil { + return func(topic string) []attribute.KeyValue { + v := single(topic) + if v == (attribute.KeyValue{}) { + return nil + } + return []attribute.KeyValue{v} + } + } + return func(topic string) []attribute.KeyValue { + return append(multiple(topic), single(topic)) + } +} + // newCertReloadingDialer returns a dialer that reloads the CA cert when the // file mod time changes. func newCertReloadingDialer(caPath, certPath, keyPath string, diff --git a/kafka/common_test.go b/kafka/common_test.go index e80443bd..87de89ce 100644 --- a/kafka/common_test.go +++ b/kafka/common_test.go @@ -45,6 +45,7 @@ import ( "github.com/twmb/franz-go/pkg/kfake" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl" + "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" ) @@ -59,8 +60,6 @@ func TestCommonConfig(t *testing.T) { t.Helper() err := in.finalize() require.NoError(t, err) - in.TopicAttributeFunc = nil - in.TopicLogFieldFunc = nil in.hooks = nil assert.Equal(t, expected, in) } @@ -340,7 +339,7 @@ func TestCommonConfigFileHook(t *testing.T) { require.NoError(t, cfg.finalize()) assert.Equal(t, []string{"testing.invalid"}, cfg.Brokers) - client, err := cfg.newClient(nil) + client, err := cfg.newClientWithOpts(nil) require.NoError(t, err) defer client.Close() @@ -774,6 +773,106 @@ func TestCertificateHotReloadErrors(t *testing.T) { }) } +func TestMergeTopicFunctions(t *testing.T) { + t.Run("log fields", func(t *testing.T) { + tests := []struct { + name string + single TopicLogFieldFunc + multiple TopicLogFieldsFunc + want []zap.Field + }{ + { + name: "both nil functions", + }, + { + name: "nil single log field", + multiple: func(_ string) []zap.Field { + return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")} + }, + want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")}, + }, + { + name: "nil multiple log field", + single: func(_ string) zap.Field { + return zap.String("test-1", "test-1") + }, + want: []zap.Field{zap.String("test-1", "test-1")}, + }, + { + name: "both functions exist", + multiple: func(_ string) []zap.Field { + return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")} + }, + single: func(_ string) zap.Field { + return zap.String("test-3", "test-3") + }, + want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2"), zap.String("test-3", "test-3")}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fn := mergeTopicLogFieldsFunctions(tt.single, tt.multiple) + if tt.want == nil { + require.Nil(t, fn) + return + } + fields := fn("test") + require.Equal(t, tt.want, fields) + }) + } + }) + t.Run("attributes", func(t *testing.T) { + tests := []struct { + name string + single TopicAttributeFunc + multiple TopicAttributesFunc + want []attribute.KeyValue + }{ + { + name: "both nil functions", + }, + { + name: "nil single attribute", + multiple: func(_ string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")} + }, + want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")}, + }, + { + name: "nil multiple log field", + single: func(_ string) attribute.KeyValue { + return attribute.String("test-1", "test-1") + }, + want: []attribute.KeyValue{attribute.String("test-1", "test-1")}, + }, + { + name: "both functions exist", + multiple: func(_ string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")} + }, + single: func(_ string) attribute.KeyValue { + return attribute.String("test-3", "test-3") + }, + want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fn := mergeTopicAttributeFunctions(tt.single, tt.multiple) + if tt.want == nil { + require.Nil(t, fn) + return + } + fields := fn("test") + require.Equal(t, tt.want, fields) + }) + } + + }) +} + // Helper functions for certificate generation func generateCA(t testing.TB) (*rsa.PrivateKey, *x509.Certificate, []byte) { caKey, err := rsa.GenerateKey(rand.Reader, 2048) diff --git a/kafka/consumer.go b/kafka/consumer.go index 2d5c6a2a..23f44088 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -212,7 +212,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { namespacePrefix := cfg.namespacePrefix() consumer := &consumer{ topicPrefix: namespacePrefix, - logFieldFn: cfg.TopicLogFieldFunc, + logFieldsFn: cfg.TopicLogFieldsFunc, assignments: make(map[topicPartition]*pc), processor: cfg.Processor, logger: cfg.Logger.Named("partition"), @@ -230,7 +230,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { kgo.ConsumeTopics(topics...), // If a rebalance happens while the client is polling, the consumed // records may belong to a partition which has been reassigned to a - // different consumer int he group. To avoid this scenario, Polls will + // different consumer in the group. To avoid this scenario, Polls will // block rebalances of partitions which would be lost, and the consumer // MUST manually call `AllowRebalance`. kgo.BlockRebalanceOnPoll(), @@ -270,7 +270,10 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { opts = append(opts, kgo.BrokerMaxReadBytes(cfg.BrokerMaxReadBytes)) } - client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) + client, err := cfg.newClientWithOpts( + []clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + opts..., + ) if err != nil { return nil, fmt.Errorf("kafka: failed creating kafka consumer: %w", err) } @@ -396,8 +399,10 @@ func (c *Consumer) fetch(ctx context.Context) error { // franz-go can inject fake fetches in case of errors. // the fake fetch can have an empty topic so we need to // account for that - if c.cfg.TopicLogFieldFunc != nil && topicName != "" { - logger = logger.With(c.cfg.TopicLogFieldFunc(topicName)) + if topicName != "" { + if c.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.cfg.TopicLogFieldsFunc(topicName)...) + } } logger.Error( @@ -429,7 +434,8 @@ type consumer struct { processor apmqueue.Processor logger *zap.Logger delivery apmqueue.DeliveryType - logFieldFn TopicLogFieldFunc + logFieldsFn TopicLogFieldsFunc + // ctx contains the graceful cancellation context that is passed to the // partition consumers. ctx context.Context @@ -452,8 +458,8 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[ zap.String("topic", t), zap.Int32("partition", partition), ) - if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(t)) + if c.logFieldsFn != nil { + logger = logger.With(c.logFieldsFn(t)...) } pc := newPartitionConsumer(c.ctx, client, c.processor, @@ -535,8 +541,8 @@ func (c *consumer) processFetch(fetches kgo.Fetches) { if c.delivery == apmqueue.AtMostOnceDeliveryType { topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix) logger := c.logger - if c.logFieldFn != nil { - logger = logger.With(c.logFieldFn(topicName)) + if c.logFieldsFn != nil { + logger = logger.With(c.logFieldsFn(topicName)...) } logger.Warn( "data loss: failed to send records to process after commit", diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index f1659190..e090101a 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -907,8 +907,6 @@ func assertNotNilOptions(t testing.TB, cfg *ConsumerConfig) { cfg.Processor = nil assert.NotNil(t, cfg.Logger) cfg.Logger = nil - assert.NotNil(t, cfg.TopicAttributeFunc) - cfg.TopicAttributeFunc = nil } func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer { diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index eac0a35d..4c1fa20c 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -108,7 +108,10 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, opts = append(opts, kgo.FetchMinBytes(cfg.MinFetchSize)) } - client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) + client, err := cfg.newClientWithOpts( + []clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + opts..., + ) if err != nil { return nil, err } diff --git a/kafka/logger_test.go b/kafka/logger_test.go index 6f70bb6b..a9b77842 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -64,7 +64,11 @@ func TestHookLogsFailedDial(t *testing.T) { } // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") }) + c, err := cfg.newClientWithOpts([]clientOptsFn{ + withTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("k", "v")} + }), + }) require.NoError(t, err) <-time.After(time.Millisecond) @@ -88,7 +92,11 @@ func TestHookLogsFailedDial(t *testing.T) { // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster // using the broken dialer. - c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") }) + c, err := cfg.newClientWithOpts([]clientOptsFn{ + withTopicMultipleAttributeFunc(func(string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("k", "v")} + }), + }) require.NoError(t, err) assert.Error(t, c.Ping(context.Background())) diff --git a/kafka/manager.go b/kafka/manager.go index ec26b188..1f64cf9e 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -69,7 +69,7 @@ func NewManager(cfg ManagerConfig) (*Manager, error) { if err := cfg.finalize(); err != nil { return nil, fmt.Errorf("kafka: invalid manager config: %w", err) } - client, err := cfg.newClient(nil) + client, err := cfg.newClientWithOpts(nil) if err != nil { return nil, fmt.Errorf("kafka: failed creating kafka client: %w", err) } @@ -124,8 +124,12 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er for _, response := range responses.Sorted() { topic := strings.TrimPrefix(response.Topic, namespacePrefix) logger := m.cfg.Logger.With(zap.String("topic", topic)) - if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) + if m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(m.cfg.TopicLogFieldsFunc(topic)...) + } + var commonAttrs []attribute.KeyValue + if m.cfg.TopicAttributesFunc != nil { + commonAttrs = m.cfg.TopicAttributesFunc(topic) } if err := response.Err; err != nil { @@ -142,9 +146,8 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "failure"), attribute.String("topic", topic), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + attrs = append(attrs, commonAttrs...) + m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -156,9 +159,7 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er attribute.String("outcome", "success"), attribute.String("topic", topic), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) - } + attrs = append(attrs, commonAttrs...) m.deleted.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet(attrs...), )) @@ -256,8 +257,8 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m topic = topic[len(namespacePrefix):] logger := m.cfg.Logger - if m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(m.cfg.TopicLogFieldFunc(topic)) + if m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(m.cfg.TopicLogFieldsFunc(topic)...) } var matchesRegex bool @@ -298,8 +299,8 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", topic), attribute.Int("partition", int(partition)), } - if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) + if m.cfg.TopicAttributesFunc != nil { + attrs = append(attrs, m.cfg.TopicAttributesFunc(topic)...) } o.ObserveInt64( consumerGroupLagMetric, lag.Lag, @@ -313,8 +314,8 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m attribute.String("topic", key.topic), attribute.String("client_id", key.clientID), } - if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) + if m.cfg.TopicAttributesFunc != nil { + attrs = append(attrs, m.cfg.TopicAttributesFunc(key.topic)...) } o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet( attribute.NewSet(attrs...), diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 57061728..e6b24842 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -67,7 +67,18 @@ func TestManagerDeleteTopics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp commonConfig.MeterProvider = mt.MeterProvider - commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { return attribute.KeyValue{} } + commonConfig.TopicLogFieldFunc = func(topic string) zap.Field { + return zap.String("from-1", "singular-topic-log-func") + } + commonConfig.TopicLogFieldsFunc = func(topic string) []zap.Field { + return []zap.Field{zap.String("from-2", "multiple-topic-log-func")} + } + commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { + return attribute.String("from-1", "singular-topic-attribute-func") + } + commonConfig.TopicAttributesFunc = func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("from-2", "multiple-topic-attribute-func")} + } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) @@ -117,6 +128,8 @@ func TestManagerDeleteTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.String("topic", "topic1"), + zap.String("from-2", "multiple-topic-log-func"), + zap.String("from-1", "singular-topic-log-func"), }, }, { Entry: zapcore.Entry{ @@ -127,6 +140,8 @@ func TestManagerDeleteTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.String("topic", "topic3"), + zap.String("from-2", "multiple-topic-log-func"), + zap.String("from-1", "singular-topic-log-func"), }, }}, matchingLogs.AllUntimed()) @@ -160,11 +175,13 @@ func TestManagerDeleteTopics(t *testing.T) { // Ensure only 1 topic was deleted, which also matches the number of spans. assert.Empty(t, cmp.Diff(metrictest.Int64Metrics{ {Name: "topics.deleted.count"}: { - {K: "topic", V: "topic2"}: 1, - {K: "messaging.system", V: "kafka"}: 2, - {K: "outcome", V: "failure"}: 1, - {K: "outcome", V: "success"}: 1, - {K: "topic", V: "topic3"}: 1, + {K: "topic", V: "topic2"}: 1, + {K: "messaging.system", V: "kafka"}: 2, + {K: "outcome", V: "failure"}: 1, + {K: "outcome", V: "success"}: 1, + {K: "topic", V: "topic3"}: 1, + {K: "from-1", V: "singular-topic-attribute-func"}: 2, + {K: "from-2", V: "multiple-topic-attribute-func"}: 2, }, }, gotMetrics)) } @@ -185,6 +202,9 @@ func TestManagerMetrics(t *testing.T) { commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { return attribute.Bool("foo", true) } + commonConfig.TopicAttributesFunc = func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.Bool("bar", true)} + } m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) @@ -466,6 +486,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic1"), attribute.Int("partition", 1), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 0, // end offset = 1, committed = 1 }, { @@ -474,6 +495,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic1"), attribute.Int("partition", 2), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, // end offset = 2, committed = 1 }, { @@ -482,6 +504,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic2"), attribute.Int("partition", 3), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 2, // end offset = 3, committed = 1 }, { @@ -490,6 +513,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "topic3"), attribute.Int("partition", 4), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 0, // end offset = 4, nothing committed }, { @@ -498,6 +522,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("topic", "mytopic"), attribute.Int("partition", 1), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, // end offset = 1, nothing committed }}, @@ -510,6 +535,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer1"), attribute.String("topic", "topic1"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -518,6 +544,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer2"), attribute.String("topic", "topic2"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -526,6 +553,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer2"), attribute.String("topic", "topic1"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -534,6 +562,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer3"), attribute.String("topic", "topic3"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }, { @@ -542,6 +571,7 @@ func TestManagerMetrics(t *testing.T) { attribute.String("group", "consumer3"), attribute.String("topic", "mytopic"), attribute.Bool("foo", true), + attribute.Bool("bar", true), ), Value: 1, }}, diff --git a/kafka/metrics.go b/kafka/metrics.go index 09f4fb89..d75bc1e7 100644 --- a/kafka/metrics.go +++ b/kafka/metrics.go @@ -66,10 +66,18 @@ var ( // TopicAttributeFunc run on `kgo.HookProduceBatchWritten` and // `kgo.HookFetchBatchRead` for each topic/partition. It can be -// used include additionaly dimensions for `consumer.messages.fetched` +// used to include one additional dimension for `consumer.messages.fetched` // and `producer.messages.count` metrics. +// +// Deprecated: Please use TopicAttributesFunc instead. type TopicAttributeFunc func(topic string) attribute.KeyValue +// TopicAttributesFunc run on `kgo.HookProduceBatchWritten` and +// `kgo.HookFetchBatchRead` for each topic/partition. It can be +// used to include aditional dimensions for `consumer.messages.fetched` +// and `producer.messages.count` metrics. +type TopicAttributesFunc func(topic string) []attribute.KeyValue + type metricHooks struct { namespace string topicPrefix string @@ -103,11 +111,13 @@ type metricHooks struct { messageDelay metric.Float64Histogram throttlingDuration metric.Float64Histogram - topicAttributeFunc TopicAttributeFunc + topicAttributesFunc TopicAttributesFunc } -func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string, - topicAttributeFunc TopicAttributeFunc, +func newKgoHooks( + mp metric.MeterProvider, + namespace, topicPrefix string, + topicMultipleAttributeFunc TopicAttributesFunc, ) (*metricHooks, error) { m := mp.Meter(instrumentName) @@ -333,7 +343,7 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string, messageDelay: messageDelayHistogram, throttlingDuration: throttlingDurationHistogram, - topicAttributeFunc: topicAttributeFunc, + topicAttributesFunc: topicMultipleAttributeFunc, }, nil } @@ -455,8 +465,8 @@ func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata, attribute.String("outcome", "success"), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) @@ -496,8 +506,8 @@ func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata, semconv.MessagingKafkaSourcePartition(int(partition)), attribute.String("compression.codec", compressionFromCodec(m.CompressionType)), ) - if kv := h.topicAttributeFunc(topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) @@ -538,8 +548,8 @@ func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) { semconv.MessagingKafkaDestinationPartition(int(r.Partition)), attribute.String("outcome", "failure"), ) - if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(r.Topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) @@ -573,8 +583,8 @@ func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) { semconv.MessagingSourceName(strings.TrimPrefix(r.Topic, h.topicPrefix)), semconv.MessagingKafkaSourcePartition(int(r.Partition)), ) - if kv := h.topicAttributeFunc(r.Topic); kv != (attribute.KeyValue{}) { - attrs = append(attrs, kv) + if h.topicAttributesFunc != nil { + attrs = append(attrs, h.topicAttributesFunc(r.Topic)...) } if h.namespace != "" { attrs = append(attrs, attribute.String("namespace", h.namespace)) diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index cbb8e2ab..76ce10a4 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -82,7 +82,7 @@ func TestProducerMetrics(t *testing.T) { } } t.Run("DeadlineExceeded", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{ { Name: "producer.messages.count", @@ -113,7 +113,7 @@ func TestProducerMetrics(t *testing.T) { test(ctx, t, producer, rdr, "default-topic", want) }) t.Run("ContextCanceled", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{ { Name: "producer.messages.count", @@ -143,7 +143,7 @@ func TestProducerMetrics(t *testing.T) { test(ctx, t, producer, rdr, "default-topic", want) }) t.Run("Unknown error reason", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{{ Name: "producer.messages.count", Description: "The number of messages produced", @@ -171,7 +171,7 @@ func TestProducerMetrics(t *testing.T) { test(context.Background(), t, producer, rdr, "default-topic", want) }) t.Run("unknown topic", func(t *testing.T) { - producer, rdr := setupTestProducer(t, nil) + producer, rdr := setupTestProducer(t, nil, nil) want := []metricdata.Metrics{{ Name: "producer.messages.count", Description: "The number of messages produced", @@ -201,7 +201,9 @@ func TestProducerMetrics(t *testing.T) { }) t.Run("Produced", func(t *testing.T) { producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue { - return attribute.String("test", "test") + return attribute.String("test-1", "test-1") + }, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")} }) want := []metricdata.Metrics{ { @@ -221,7 +223,9 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), semconv.MessagingDestinationName("default-topic"), semconv.MessagingKafkaDestinationPartition(0), - attribute.String("test", "test"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), attribute.String("compression.codec", "none"), ), }, @@ -245,7 +249,9 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), semconv.MessagingDestinationName("default-topic"), semconv.MessagingKafkaDestinationPartition(0), - attribute.String("test", "test"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), attribute.String("compression.codec", "none"), ), }, @@ -269,7 +275,9 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), semconv.MessagingDestinationName("default-topic"), semconv.MessagingKafkaDestinationPartition(0), - attribute.String("test", "test"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), attribute.String("compression.codec", "none"), ), }, @@ -317,6 +325,8 @@ func TestProducerMetrics(t *testing.T) { t.Run("ProducedWithHeaders", func(t *testing.T) { producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue { return attribute.String("some key", "some value") + }, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")} }) want := []metricdata.Metrics{ { @@ -338,6 +348,8 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingKafkaDestinationPartition(0), attribute.String("some key", "some value"), attribute.String("compression.codec", "snappy"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), ), }, }, @@ -362,6 +374,8 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingKafkaDestinationPartition(0), attribute.String("some key", "some value"), attribute.String("compression.codec", "snappy"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), ), }, }, @@ -386,6 +400,8 @@ func TestProducerMetrics(t *testing.T) { semconv.MessagingKafkaDestinationPartition(0), attribute.String("some key", "some value"), attribute.String("compression.codec", "snappy"), + attribute.String("test-2", "test-2"), + attribute.String("test-3", "test-3"), ), }, }, @@ -414,6 +430,8 @@ func TestConsumerMetrics(t *testing.T) { }) tc := setupTestConsumer(t, proc, func(topic string) attribute.KeyValue { return attribute.String("header", "included") + }, func(topic string) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")} }) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -427,6 +445,8 @@ func TestConsumerMetrics(t *testing.T) { Headers: []kgo.RecordHeader{ {Key: "header", Value: []byte("included")}, {Key: "traceparent", Value: []byte("excluded")}, + {Key: "test-1", Value: []byte("test-1")}, + {Key: "test-2", Value: []byte("test-2")}, }, }) } @@ -455,6 +475,8 @@ func TestConsumerMetrics(t *testing.T) { Attributes: attribute.NewSet( attribute.String("compression.codec", "none"), attribute.String("header", "included"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), semconv.MessagingKafkaSourcePartition(0), semconv.MessagingSourceName(t.Name()), semconv.MessagingSystem("kafka"), @@ -479,6 +501,8 @@ func TestConsumerMetrics(t *testing.T) { semconv.MessagingSystem("kafka"), attribute.String("namespace", "name_space"), attribute.String("topic", "name_space-TestConsumerMetrics"), + attribute.String("test-1", "test-1"), + attribute.String("test-2", "test-2"), ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, @@ -549,7 +573,7 @@ func filterMetrics(t testing.TB, sm []metricdata.ScopeMetrics) []metricdata.Metr return []metricdata.Metrics{} } -func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc) (*Producer, sdkmetric.Reader) { +func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc, tmafunc TopicAttributesFunc) (*Producer, sdkmetric.Reader) { t.Helper() rdr := sdkmetric.NewManualReader() @@ -560,12 +584,13 @@ func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc) (*Producer, sdkm }) producer := newProducer(t, ProducerConfig{ CommonConfig: CommonConfig{ - Brokers: brokers, - Logger: zap.NewNop(), - Namespace: "name_space", - TracerProvider: noop.NewTracerProvider(), - MeterProvider: mp, - TopicAttributeFunc: tafunc, + Brokers: brokers, + Logger: zap.NewNop(), + Namespace: "name_space", + TracerProvider: noop.NewTracerProvider(), + MeterProvider: mp, + TopicAttributeFunc: tafunc, + TopicAttributesFunc: tmafunc, }, Sync: true, }) @@ -578,7 +603,12 @@ type testMetricConsumer struct { reader sdkmetric.Reader } -func setupTestConsumer(t testing.TB, p apmqueue.Processor, tafunc TopicAttributeFunc) (mc testMetricConsumer) { +func setupTestConsumer( + t testing.TB, + p apmqueue.Processor, + tafunc TopicAttributeFunc, + tmafunc TopicAttributesFunc, +) (mc testMetricConsumer) { t.Helper() mc.reader = sdkmetric.NewManualReader() @@ -593,7 +623,8 @@ func setupTestConsumer(t testing.TB, p apmqueue.Processor, tafunc TopicAttribute MeterProvider: sdkmetric.NewMeterProvider( sdkmetric.WithReader(mc.reader), ), - TopicAttributeFunc: tafunc, + TopicAttributeFunc: tafunc, + TopicAttributesFunc: tmafunc, }, } mc.client, cfg.Brokers = newClusterWithTopics(t, 1, "name_space-"+t.Name()) diff --git a/kafka/producer.go b/kafka/producer.go index 03a0dcd6..f11a79a4 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -188,7 +188,10 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) { if cfg.AllowAutoTopicCreation { opts = append(opts, kgo.AllowAutoTopicCreation()) } - client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) + client, err := cfg.newClientWithOpts( + []clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)}, + opts..., + ) if err != nil { return nil, fmt.Errorf("kafka: failed creating producer: %w", err) } @@ -269,8 +272,8 @@ func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error { mu.Unlock() logger := p.cfg.Logger - if p.cfg.TopicLogFieldFunc != nil { - logger = logger.With(p.cfg.TopicLogFieldFunc(topicName)) + if p.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(p.cfg.TopicLogFieldsFunc(topicName)...) } logger.Error("failed producing message", diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index d52cb5d9..50a1ffce 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -168,8 +168,8 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } if err := response.Err; err != nil { if errors.Is(err, kerr.TopicAlreadyExists) { @@ -223,8 +223,8 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for _, response := range updateResp.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } if errors.Is(response.Err, kerr.InvalidRequest) { @@ -266,8 +266,8 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for _, response := range alterResp { topicName := strings.TrimPrefix(response.Name, namespacePrefix) logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldFunc(topicName)) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } if err := response.Err; err != nil {