Skip to content

Commit

Permalink
contrib: peer.service precursors for confluentinc kafka integratio…
Browse files Browse the repository at this point in the history
…ns (#2008)
  • Loading branch information
zarirhamza committed May 26, 2023
1 parent e681ddd commit fbd37ea
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 54 deletions.
8 changes: 8 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
if err != nil {
return nil, err
}
opts = append(opts, WithConfig(conf))
return WrapConsumer(c, opts...), nil
}

Expand All @@ -40,6 +41,7 @@ func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
if err != nil {
return nil, err
}
opts = append(opts, WithConfig(conf))
return WrapProducer(p, opts...), nil
}

Expand Down Expand Up @@ -107,6 +109,9 @@ func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span {
tracer.Tag(ext.MessagingSystem, "kafka"),
tracer.Measured(),
}
if c.cfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, c.cfg.bootstrapServers))
}
if c.cfg.tagFns != nil {
for key, tagFn := range c.cfg.tagFns {
opts = append(opts, tracer.Tag(key, tagFn(msg)))
Expand Down Expand Up @@ -217,6 +222,9 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span {
tracer.Tag(ext.MessagingSystem, "kafka"),
tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition),
}
if p.cfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, p.cfg.bootstrapServers))
}
if !math.IsNaN(p.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))

s1 := spans[1] // consume
Expand All @@ -221,6 +222,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))
})
}
Expand Down
18 changes: 18 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package kafka
import (
"context"
"math"
"net"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
Expand All @@ -22,6 +24,7 @@ type config struct {
consumerOperationName string
producerOperationName string
analyticsRate float64
bootstrapServers string
tagFns map[string]func(msg *kafka.Message) interface{}
}

Expand Down Expand Up @@ -104,3 +107,18 @@ func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Optio
cfg.tagFns[tag] = tagFn
}
}

// WithConfig extracts the config information for the client to be tagged
func WithConfig(cg *kafka.ConfigMap) Option {
return func(cfg *config) {
if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" {
for _, addr := range strings.Split(bs.(string), ",") {
host, _, err := net.SplitHostPort(addr)
if err == nil {
cfg.bootstrapServers = host
return
}
}
}
}
}
64 changes: 12 additions & 52 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
if err != nil {
return nil, err
}
return WrapConsumerWithConfig(c, conf, opts...), nil

opts = append(opts, WithConfig(conf))
return WrapConsumer(c, opts...), nil
}

// NewProducer calls kafka.NewProducer and wraps the resulting Producer.
Expand All @@ -40,52 +42,29 @@ func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
if err != nil {
return nil, err
}
return WrapProducerWithConfig(p, conf, opts...), nil
}

// A kafkaConfig struct holds information from the kafka config for span tags
type kafkaConfig struct {
bootstrapServers string
opts = append(opts, WithConfig(conf))
return WrapProducer(p, opts...), nil
}

// A Consumer wraps a kafka.Consumer.
type Consumer struct {
*kafka.Consumer
kafkaCfg *kafkaConfig
cfg *config
events chan kafka.Event
prev ddtrace.Span
cfg *config
events chan kafka.Event
prev ddtrace.Span
}

// WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
// Deprecated: Replaced with WrapConsumerWithConfig such that config information can be used in span tags.
func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
wrapped := &Consumer{
Consumer: c,
kafkaCfg: &kafkaConfig{},
cfg: newConfig(opts...),
}
log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Consumer: %#v", wrapped.cfg)
wrapped.events = wrapped.traceEventsChannel(c.Events())
return wrapped
}

// WrapConsumerWithConfig wraps a kafka.Consumer with its Config such that future information can be tagged.
func WrapConsumerWithConfig(c *kafka.Consumer, cg *kafka.ConfigMap, opts ...Option) *Consumer {
wrapped := &Consumer{
Consumer: c,
kafkaCfg: &kafkaConfig{},
cfg: newConfig(opts...),
}

if bs, err := cg.Get("bootstrap.servers", ""); err == nil {
wrapped.kafkaCfg.bootstrapServers = bs.(string)
}
log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Consumer: %#v", wrapped.cfg)
wrapped.events = wrapped.traceEventsChannel(c.Events())
return wrapped
}

func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
// in will be nil when consuming via the events channel is not enabled
if in == nil {
Expand Down Expand Up @@ -133,8 +112,8 @@ func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span {
tracer.Measured(),
}

if c.kafkaCfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, c.kafkaCfg.bootstrapServers))
if c.cfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, c.cfg.bootstrapServers))
}
if c.cfg.tagFns != nil {
for key, tagFn := range c.cfg.tagFns {
Expand Down Expand Up @@ -206,35 +185,16 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
// A Producer wraps a kafka.Producer.
type Producer struct {
*kafka.Producer
kafkaCfg *kafkaConfig
cfg *config
produceChannel chan *kafka.Message
}

// WrapProducer wraps a kafka.Producer so requests are traced.
// Deprecated: Replaced with WrapProducerWithConfig such that config information can be used in span tags.
func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
wrapped := &Producer{
Producer: p,
kafkaCfg: &kafkaConfig{},
cfg: newConfig(opts...),
}
log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Producer: %#v", wrapped.cfg)
wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
return wrapped
}

// WrapProducerWithConfig wraps a kafka.Producer with its Config such that future information can be tagged.
func WrapProducerWithConfig(p *kafka.Producer, cg *kafka.ConfigMap, opts ...Option) *Producer {
wrapped := &Producer{
Producer: p,
kafkaCfg: &kafkaConfig{},
cfg: newConfig(opts...),
}

if bs, err := cg.Get("bootstrap.servers", ""); err == nil {
wrapped.kafkaCfg.bootstrapServers = bs.(string)
}
log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Producer: %#v", wrapped.cfg)
wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
return wrapped
Expand Down Expand Up @@ -268,8 +228,8 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span {
tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition),
}

if p.kafkaCfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, p.kafkaCfg.bootstrapServers))
if p.cfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, p.cfg.bootstrapServers))
}
if !math.IsNaN(p.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate))
Expand Down
4 changes: 2 additions & 2 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))
assert.Equal(t, "127.0.0.1:9092", s0.Tag(ext.KafkaBootstrapServers))
assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
Expand All @@ -223,7 +223,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))
assert.Equal(t, "127.0.0.1:9092", s1.Tag(ext.KafkaBootstrapServers))
assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers))
})
}
}
Expand Down
18 changes: 18 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package kafka
import (
"context"
"math"
"net"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
Expand All @@ -22,6 +24,7 @@ type config struct {
consumerOperationName string
producerOperationName string
analyticsRate float64
bootstrapServers string
tagFns map[string]func(msg *kafka.Message) interface{}
}

Expand Down Expand Up @@ -104,3 +107,18 @@ func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Optio
cfg.tagFns[tag] = tagFn
}
}

// WithConfig extracts the config information for the client to be tagged
func WithConfig(cg *kafka.ConfigMap) Option {
return func(cfg *config) {
if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" {
for _, addr := range strings.Split(bs.(string), ",") {
host, _, err := net.SplitHostPort(addr)
if err == nil {
cfg.bootstrapServers = host
return
}
}
}
}
}

0 comments on commit fbd37ea

Please sign in to comment.