Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 86 additions & 17 deletions kafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ import (
type SASLMechanism = sasl.Mechanism

// TopicLogFieldFunc is a function that returns a zap.Field for a given topic.
// Deprecated: use TopicLogFieldsFunc instead.
type TopicLogFieldFunc func(topic string) zap.Field

// TopicLogFieldsFunc is a function that returns a list of zap.Field for a given topic.
type TopicLogFieldsFunc func(topic string) []zap.Field

// CommonConfig defines common configuration for Kafka consumers, producers,
// and managers.
type CommonConfig struct {
Expand Down Expand Up @@ -150,16 +154,28 @@ type CommonConfig struct {
// Defaults to the global one.
MeterProvider metric.MeterProvider

// TopicAttributeFunc can be used to create custom dimensions from a Kafka
// TopicAttributeFunc can be used to create one custom dimension from a Kafka
// topic for these metrics:
// - producer.messages.count
// - consumer.messages.fetched
// Deprecated: Use TopicAttributesFunc instead.
TopicAttributeFunc TopicAttributeFunc

// TopicAttributeFunc can be used to create custom dimensions from a Kafka
// topic for log messages
// TopicAttributesFunc can be used to create multiple custom dimensions from a Kafka
// topic for these metrics:
// - producer.messages.count
// - consumer.messages.fetched
TopicAttributesFunc TopicAttributesFunc

// TopicAttributeFunc can be used to create one custom dimension from a Kafka
// topic for log messages.
// Deprecated: use TopicLogFieldsFunc instead.
TopicLogFieldFunc TopicLogFieldFunc

// TopicLogFieldsFunc can be used to create custom dimensions from a Kafka
// // topic for log messages.
TopicLogFieldsFunc TopicLogFieldsFunc

// MetadataMaxAge is the maximum age of metadata before it is refreshed.
// The lower the value the more frequently new topics will be discovered.
// If zero, the default value of 5 minutes is used.
Expand Down Expand Up @@ -198,23 +214,14 @@ func (cfg *CommonConfig) finalize() error {
errs = append(errs, err)
}

// Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with
// an unknown type (like `zap.Field{}`).
if cfg.TopicLogFieldFunc != nil {
cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc)
}
if cfg.TopicAttributeFunc == nil {
cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue {
return attribute.KeyValue{}
}
}
cfg.TopicLogFieldsFunc = mergeTopicLogFieldsFunctions(cfg.TopicLogFieldFunc, cfg.TopicLogFieldsFunc)
cfg.TopicAttributesFunc = mergeTopicAttributeFunctions(cfg.TopicAttributeFunc, cfg.TopicAttributesFunc)

return errors.Join(errs...)
}

// Merge the configs generated from env vars and/or files.
func (cfg *CommonConfig) flatten(envCfg *envConfig, fileCfg *fileConfig) error {

// Config file was set with env vars.
if cfg.ConfigFile == "" {
cfg.ConfigFile = envCfg.configFile
Expand Down Expand Up @@ -281,7 +288,24 @@ func (cfg *CommonConfig) meterProvider() metric.MeterProvider {
return otel.GetMeterProvider()
}

func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
type clientOptsFn func(opts *clientOpts)

type clientOpts struct {
topicAttributesFunc TopicAttributesFunc
}

func withTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(clOpts *clientOpts) {
return func(clOpts *clientOpts) {
clOpts.topicAttributesFunc = topicAttributesFunc
}
}

func (cfg *CommonConfig) newClientWithOpts(clientOptsFn []clientOptsFn, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
clOpts := &clientOpts{}
for _, opt := range clientOptsFn {
opt(clOpts)
}

opts := []kgo.Opt{
kgo.WithLogger(kzap.New(cfg.Logger.Named("kafka"))),
kgo.SeedBrokers(cfg.Brokers...),
Expand All @@ -304,8 +328,8 @@ func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additi
}
opts = append(opts, additionalOpts...)
if !cfg.DisableTelemetry {
metricHooks, err := newKgoHooks(cfg.meterProvider(),
cfg.Namespace, cfg.namespacePrefix(), topicAttributeFunc,
metricHooks, err := newKgoHooks(
cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), clOpts.topicAttributesFunc,
)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err)
Expand Down Expand Up @@ -359,6 +383,51 @@ func topicFieldFunc(f TopicLogFieldFunc) TopicLogFieldFunc {
}
}

// mergeTopicLogFieldsFunctions merges the fields from TopicLogFieldFunc
// and TopicLogFieldsFunc, and returns a new TopicLogFieldsFunc with all
// log fields.
func mergeTopicLogFieldsFunctions(single TopicLogFieldFunc, multiple TopicLogFieldsFunc) TopicLogFieldsFunc {
if single == nil {
return multiple
}
if multiple == nil {
return func(topic string) []zap.Field {
fn := topicFieldFunc(single)
return []zap.Field{fn(topic)}
}
}
return func(topic string) []zap.Field {
fields := multiple(topic)
for i := range fields {
if fields[i].Type <= zapcore.UnknownType {
fields[i] = zap.Skip()
}
}
return append(fields, single(topic))
}
}

// mergeTopicAttributeFunctions merges the attributes from TopicAttributeFunc
// and TopicAttributesFunc, and returns a new TopicAttributesFunc with all
// attributes.
func mergeTopicAttributeFunctions(single TopicAttributeFunc, multiple TopicAttributesFunc) TopicAttributesFunc {
if single == nil {
return multiple
}
if multiple == nil {
return func(topic string) []attribute.KeyValue {
v := single(topic)
if v == (attribute.KeyValue{}) {
return nil
}
return []attribute.KeyValue{v}
}
}
return func(topic string) []attribute.KeyValue {
return append(multiple(topic), single(topic))
}
}

// newCertReloadingDialer returns a dialer that reloads the CA cert when the
// file mod time changes.
func newCertReloadingDialer(caPath, certPath, keyPath string,
Expand Down
105 changes: 102 additions & 3 deletions kafka/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
)

Expand All @@ -59,8 +60,6 @@ func TestCommonConfig(t *testing.T) {
t.Helper()
err := in.finalize()
require.NoError(t, err)
in.TopicAttributeFunc = nil
in.TopicLogFieldFunc = nil
in.hooks = nil
assert.Equal(t, expected, in)
}
Expand Down Expand Up @@ -340,7 +339,7 @@ func TestCommonConfigFileHook(t *testing.T) {
require.NoError(t, cfg.finalize())
assert.Equal(t, []string{"testing.invalid"}, cfg.Brokers)

client, err := cfg.newClient(nil)
client, err := cfg.newClientWithOpts(nil)
require.NoError(t, err)
defer client.Close()

Expand Down Expand Up @@ -774,6 +773,106 @@ func TestCertificateHotReloadErrors(t *testing.T) {
})
}

func TestMergeTopicFunctions(t *testing.T) {
t.Run("log fields", func(t *testing.T) {
tests := []struct {
name string
single TopicLogFieldFunc
multiple TopicLogFieldsFunc
want []zap.Field
}{
{
name: "both nil functions",
},
{
name: "nil single log field",
multiple: func(_ string) []zap.Field {
return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")}
},
want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")},
},
{
name: "nil multiple log field",
single: func(_ string) zap.Field {
return zap.String("test-1", "test-1")
},
want: []zap.Field{zap.String("test-1", "test-1")},
},
{
name: "both functions exist",
multiple: func(_ string) []zap.Field {
return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")}
},
single: func(_ string) zap.Field {
return zap.String("test-3", "test-3")
},
want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2"), zap.String("test-3", "test-3")},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fn := mergeTopicLogFieldsFunctions(tt.single, tt.multiple)
if tt.want == nil {
require.Nil(t, fn)
return
}
fields := fn("test")
require.Equal(t, tt.want, fields)
})
}
})
t.Run("attributes", func(t *testing.T) {
tests := []struct {
name string
single TopicAttributeFunc
multiple TopicAttributesFunc
want []attribute.KeyValue
}{
{
name: "both nil functions",
},
{
name: "nil single attribute",
multiple: func(_ string) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")}
},
want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")},
},
{
name: "nil multiple log field",
single: func(_ string) attribute.KeyValue {
return attribute.String("test-1", "test-1")
},
want: []attribute.KeyValue{attribute.String("test-1", "test-1")},
},
{
name: "both functions exist",
multiple: func(_ string) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")}
},
single: func(_ string) attribute.KeyValue {
return attribute.String("test-3", "test-3")
},
want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fn := mergeTopicAttributeFunctions(tt.single, tt.multiple)
if tt.want == nil {
require.Nil(t, fn)
return
}
fields := fn("test")
require.Equal(t, tt.want, fields)
})
}

})
}

// Helper functions for certificate generation
func generateCA(t testing.TB) (*rsa.PrivateKey, *x509.Certificate, []byte) {
caKey, err := rsa.GenerateKey(rand.Reader, 2048)
Expand Down
26 changes: 16 additions & 10 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
namespacePrefix := cfg.namespacePrefix()
consumer := &consumer{
topicPrefix: namespacePrefix,
logFieldFn: cfg.TopicLogFieldFunc,
logFieldsFn: cfg.TopicLogFieldsFunc,
assignments: make(map[topicPartition]*pc),
processor: cfg.Processor,
logger: cfg.Logger.Named("partition"),
Expand All @@ -230,7 +230,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
kgo.ConsumeTopics(topics...),
// If a rebalance happens while the client is polling, the consumed
// records may belong to a partition which has been reassigned to a
// different consumer int he group. To avoid this scenario, Polls will
// different consumer in the group. To avoid this scenario, Polls will
// block rebalances of partitions which would be lost, and the consumer
// MUST manually call `AllowRebalance`.
kgo.BlockRebalanceOnPoll(),
Expand Down Expand Up @@ -270,7 +270,10 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
opts = append(opts, kgo.BrokerMaxReadBytes(cfg.BrokerMaxReadBytes))
}

client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
client, err := cfg.newClientWithOpts(
[]clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)},
opts...,
)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kafka consumer: %w", err)
}
Expand Down Expand Up @@ -396,8 +399,10 @@ func (c *Consumer) fetch(ctx context.Context) error {
// franz-go can inject fake fetches in case of errors.
// the fake fetch can have an empty topic so we need to
// account for that
if c.cfg.TopicLogFieldFunc != nil && topicName != "" {
logger = logger.With(c.cfg.TopicLogFieldFunc(topicName))
if topicName != "" {
if c.cfg.TopicLogFieldsFunc != nil {
logger = logger.With(c.cfg.TopicLogFieldsFunc(topicName)...)
}
}

logger.Error(
Expand Down Expand Up @@ -429,7 +434,8 @@ type consumer struct {
processor apmqueue.Processor
logger *zap.Logger
delivery apmqueue.DeliveryType
logFieldFn TopicLogFieldFunc
logFieldsFn TopicLogFieldsFunc

// ctx contains the graceful cancellation context that is passed to the
// partition consumers.
ctx context.Context
Expand All @@ -452,8 +458,8 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[
zap.String("topic", t),
zap.Int32("partition", partition),
)
if c.logFieldFn != nil {
logger = logger.With(c.logFieldFn(t))
if c.logFieldsFn != nil {
logger = logger.With(c.logFieldsFn(t)...)
}

pc := newPartitionConsumer(c.ctx, client, c.processor,
Expand Down Expand Up @@ -535,8 +541,8 @@ func (c *consumer) processFetch(fetches kgo.Fetches) {
if c.delivery == apmqueue.AtMostOnceDeliveryType {
topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix)
logger := c.logger
if c.logFieldFn != nil {
logger = logger.With(c.logFieldFn(topicName))
if c.logFieldsFn != nil {
logger = logger.With(c.logFieldsFn(topicName)...)
}
logger.Warn(
"data loss: failed to send records to process after commit",
Expand Down
2 changes: 0 additions & 2 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,6 @@ func assertNotNilOptions(t testing.TB, cfg *ConsumerConfig) {
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil
assert.NotNil(t, cfg.TopicAttributeFunc)
cfg.TopicAttributeFunc = nil
Comment on lines -910 to -911
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we wrap this function with multiple attributes, this is no longer valid, so I removed the check

}

func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer {
Expand Down
5 changes: 4 additions & 1 deletion kafka/log_compacted_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig,
opts = append(opts, kgo.FetchMinBytes(cfg.MinFetchSize))
}

client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
client, err := cfg.newClientWithOpts(
[]clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)},
opts...,
)
if err != nil {
return nil, err
}
Expand Down
Loading