From 19d2c2209f5306c5b0ab0e3630309f85690867bb Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 18 Jan 2024 16:52:12 -0500 Subject: [PATCH] [data streams] Test kafka confluent instrumentation --- .../confluent-kafka-go/kafka.v2/kafka.go | 16 ++++ .../confluent-kafka-go/kafka.v2/kafka_test.go | 21 ++++++ .../confluent-kafka-go/kafka/kafka.go | 16 ++++ .../confluent-kafka-go/kafka/kafka_test.go | 21 ++++++ ddtrace/mocktracer/data_streams.go | 40 ++++++++++ ddtrace/mocktracer/mocktracer.go | 36 ++++----- ddtrace/tracer/data_streams.go | 10 +++ internal/datastreams/processor.go | 75 ++++++++++++++----- 8 files changed, 198 insertions(+), 37 deletions(-) create mode 100644 ddtrace/mocktracer/data_streams.go diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go index 8c7ae41542..21b89369f2 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go @@ -91,6 +91,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) } out <- evt @@ -176,10 +177,22 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) { c.prev = c.startSpan(msg) } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) } return evt } +func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) { + if !dataStreamsEnabled { + return + } + for _, tp := range offsets { + if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil { + tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high) + } + } +} + // ReadMessage polls the consumer for a message. Message will be traced. func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { if c.prev != nil { @@ -199,6 +212,7 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { tps, err := c.Consumer.Commit() commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) return tps, err } @@ -206,6 +220,7 @@ func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitMessage(msg) commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) return tps, err } @@ -213,6 +228,7 @@ func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, er func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitOffsets(offsets) commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) return tps, err } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go index 32dfa1e9eb..5057ad2418 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go @@ -9,6 +9,8 @@ import ( "context" "errors" "os" + "sort" + "strings" "testing" "time" @@ -17,6 +19,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + internaldsm "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/stretchr/testify/assert" @@ -76,6 +79,8 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO msg2, err := consumerAction(c) require.NoError(t, err) + commits, err := c.CommitMessage(msg2) + require.NoError(t, err) assert.Equal(t, msg1.String(), msg2.String()) err = c.Close() require.NoError(t, err) @@ -84,6 +89,22 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO require.Len(t, spans, 2) // they should be linked via headers assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) + + if c.cfg.dataStreamsEnabled { + backlogs := mt.SentDSMBacklogs() + sort.Slice(backlogs, func(i, j int) bool { + // it will sort the backlogs in the order: commit, high watermark, produce + return strings.Join(backlogs[i].Tags, "") < strings.Join(backlogs[j].Tags, "") + }) + require.Len(t, commits, 1) + highWatermark := int64(commits[0].Offset) + expectedBacklogs := []internaldsm.Backlog{ + {Tags: []string{"consumer_group:" + testGroupID, "partition:0", "topic:" + testTopic, "type:kafka_commit"}, Value: highWatermark}, + {Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_high_watermark"}, Value: highWatermark}, + {Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_produce"}, Value: highWatermark - 1}, + } + assert.Equal(t, expectedBacklogs, backlogs) + } return spans, msg2 } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go index c0c9a91c29..4de425ca56 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go @@ -91,6 +91,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) } out <- evt @@ -176,10 +177,22 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) { c.prev = c.startSpan(msg) } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) } return evt } +func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) { + if !dataStreamsEnabled { + return + } + for _, tp := range offsets { + if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil { + tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high) + } + } +} + // ReadMessage polls the consumer for a message. Message will be traced. func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { if c.prev != nil { @@ -199,6 +212,7 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { tps, err := c.Consumer.Commit() commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) return tps, err } @@ -206,6 +220,7 @@ func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitMessage(msg) commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) return tps, err } @@ -213,6 +228,7 @@ func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, er func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitOffsets(offsets) commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) return tps, err } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go index 2196beda41..69ab3ac5fb 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go @@ -9,6 +9,8 @@ import ( "context" "errors" "os" + "sort" + "strings" "testing" "time" @@ -17,6 +19,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + internaldsm "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/stretchr/testify/assert" @@ -76,6 +79,8 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO msg2, err := consumerAction(c) require.NoError(t, err) + commits, err := c.CommitMessage(msg2) + require.NoError(t, err) assert.Equal(t, msg1.String(), msg2.String()) err = c.Close() require.NoError(t, err) @@ -84,6 +89,22 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO require.Len(t, spans, 2) // they should be linked via headers assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) + + if c.cfg.dataStreamsEnabled { + backlogs := mt.SentDSMBacklogs() + sort.Slice(backlogs, func(i, j int) bool { + // it will sort the backlogs in the order: commit, high watermark, produce + return strings.Join(backlogs[i].Tags, "") < strings.Join(backlogs[j].Tags, "") + }) + require.Len(t, commits, 1) + highWatermark := int64(commits[0].Offset) + expectedBacklogs := []internaldsm.Backlog{ + {Tags: []string{"consumer_group:" + testGroupID, "partition:0", "topic:" + testTopic, "type:kafka_commit"}, Value: highWatermark}, + {Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_high_watermark"}, Value: highWatermark}, + {Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_produce"}, Value: highWatermark - 1}, + } + assert.Equal(t, expectedBacklogs, backlogs) + } return spans, msg2 } diff --git a/ddtrace/mocktracer/data_streams.go b/ddtrace/mocktracer/data_streams.go new file mode 100644 index 0000000000..8d0314f37b --- /dev/null +++ b/ddtrace/mocktracer/data_streams.go @@ -0,0 +1,40 @@ +package mocktracer + +import ( + "compress/gzip" + "net/http" + + "github.com/tinylib/msgp/msgp" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" +) + +type mockDSMTransport struct { + backlogs []datastreams.Backlog +} + +// RoundTrip does nothing and returns a dummy response. +func (t *mockDSMTransport) RoundTrip(req *http.Request) (*http.Response, error) { + // You can customize the dummy response if needed. + gzipReader, err := gzip.NewReader(req.Body) + if err != nil { + return nil, err + } + var p datastreams.StatsPayload + err = msgp.Decode(gzipReader, &p) + if err != nil { + return nil, err + } + for _, bucket := range p.Stats { + t.backlogs = append(t.backlogs, bucket.Backlogs...) + } + return &http.Response{ + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Request: req, + ContentLength: -1, + Body: http.NoBody, + }, nil +} diff --git a/ddtrace/mocktracer/mocktracer.go b/ddtrace/mocktracer/mocktracer.go index 3b748f5c65..0a4252bfd1 100644 --- a/ddtrace/mocktracer/mocktracer.go +++ b/ddtrace/mocktracer/mocktracer.go @@ -37,6 +37,7 @@ type Tracer interface { // FinishedSpans returns the set of finished spans. FinishedSpans() []Span + SentDSMBacklogs() []datastreams.Backlog // Reset resets the spans and services recorded in the tracer. This is // especially useful when running tests in a loop, where a clean start @@ -63,11 +64,25 @@ type mocktracer struct { sync.RWMutex // guards below spans finishedSpans []Span openSpans map[uint64]Span + dsmTransport *mockDSMTransport + dsmProcessor *datastreams.Processor +} + +func (t *mocktracer) SentDSMBacklogs() []datastreams.Backlog { + t.dsmProcessor.Flush() + return t.dsmTransport.backlogs } func newMockTracer() *mocktracer { var t mocktracer t.openSpans = make(map[uint64]Span) + t.dsmTransport = &mockDSMTransport{} + client := &http.Client{ + Transport: t.dsmTransport, + } + t.dsmProcessor = datastreams.NewProcessor(&statsd.NoOpClient{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client, func() bool { return true }) + t.dsmProcessor.Start() + t.dsmProcessor.Flush() return &t } @@ -91,27 +106,8 @@ func (t *mocktracer) StartSpan(operationName string, opts ...ddtrace.StartSpanOp return span } -type noOpTransport struct{} - -// RoundTrip does nothing and returns a dummy response. -func (t *noOpTransport) RoundTrip(req *http.Request) (*http.Response, error) { - // You can customize the dummy response if needed. - return &http.Response{ - StatusCode: 200, - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - Request: req, - ContentLength: -1, - Body: http.NoBody, - }, nil -} - func (t *mocktracer) GetDataStreamsProcessor() *datastreams.Processor { - client := &http.Client{ - Transport: &noOpTransport{}, - } - return datastreams.NewProcessor(&statsd.NoOpClient{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client, func() bool { return true }) + return t.dsmProcessor } func (t *mocktracer) OpenSpans() []Span { diff --git a/ddtrace/tracer/data_streams.go b/ddtrace/tracer/data_streams.go index 32585cd41a..92f59f1c4f 100644 --- a/ddtrace/tracer/data_streams.go +++ b/ddtrace/tracer/data_streams.go @@ -62,3 +62,13 @@ func TrackKafkaProduceOffset(topic string, partition int32, offset int64) { } } } + +// TrackKafkaHighWatermarkOffset should be used in the producer, to track when it produces a message. +// if used together with TrackKafkaCommitOffset it can generate a Kafka lag in seconds metric. +func TrackKafkaHighWatermarkOffset(cluster string, topic string, partition int32, offset int64) { + if t, ok := internal.GetGlobalTracer().(dataStreamsContainer); ok { + if p := t.GetDataStreamsProcessor(); p != nil { + p.TrackKafkaHighWatermarkOffset(cluster, topic, partition, offset) + } + } +} diff --git a/internal/datastreams/processor.go b/internal/datastreams/processor.go index c330d8e908..de0d738eb3 100644 --- a/internal/datastreams/processor.go +++ b/internal/datastreams/processor.go @@ -55,20 +55,22 @@ type statsGroup struct { } type bucket struct { - points map[uint64]statsGroup - latestCommitOffsets map[partitionConsumerKey]int64 - latestProduceOffsets map[partitionKey]int64 - start uint64 - duration uint64 + points map[uint64]statsGroup + latestCommitOffsets map[partitionConsumerKey]int64 + latestProduceOffsets map[partitionKey]int64 + latestHighWatermarkOffsets map[partitionKey]int64 + start uint64 + duration uint64 } func newBucket(start, duration uint64) bucket { return bucket{ - points: make(map[uint64]statsGroup), - latestCommitOffsets: make(map[partitionConsumerKey]int64), - latestProduceOffsets: make(map[partitionKey]int64), - start: start, - duration: duration, + points: make(map[uint64]statsGroup), + latestCommitOffsets: make(map[partitionConsumerKey]int64), + latestProduceOffsets: make(map[partitionKey]int64), + latestHighWatermarkOffsets: make(map[partitionKey]int64), + start: start, + duration: duration, } } @@ -105,7 +107,7 @@ func (b bucket) export(timestampType TimestampType) StatsBucket { Start: b.start, Duration: b.duration, Stats: stats, - Backlogs: make([]Backlog, 0, len(b.latestCommitOffsets)+len(b.latestProduceOffsets)), + Backlogs: make([]Backlog, 0, len(b.latestCommitOffsets)+len(b.latestProduceOffsets)+len(b.latestHighWatermarkOffsets)), } for key, offset := range b.latestProduceOffsets { exported.Backlogs = append(exported.Backlogs, Backlog{Tags: []string{fmt.Sprintf("partition:%d", key.partition), fmt.Sprintf("topic:%s", key.topic), "type:kafka_produce"}, Value: offset}) @@ -113,6 +115,9 @@ func (b bucket) export(timestampType TimestampType) StatsBucket { for key, offset := range b.latestCommitOffsets { exported.Backlogs = append(exported.Backlogs, Backlog{Tags: []string{fmt.Sprintf("consumer_group:%s", key.group), fmt.Sprintf("partition:%d", key.partition), fmt.Sprintf("topic:%s", key.topic), "type:kafka_commit"}, Value: offset}) } + for key, offset := range b.latestHighWatermarkOffsets { + exported.Backlogs = append(exported.Backlogs, Backlog{Tags: []string{fmt.Sprintf("partition:%d", key.partition), fmt.Sprintf("topic:%s", key.topic), "type:kafka_high_watermark"}, Value: offset}) + } return exported } @@ -154,6 +159,7 @@ type offsetType int const ( produceOffset offsetType = iota commitOffset + highWatermarkOffset ) type kafkaOffset struct { @@ -272,6 +278,13 @@ func (p *Processor) addKafkaOffset(o kafkaOffset) { }] = o.offset return } + if o.offsetType == highWatermarkOffset { + b.latestHighWatermarkOffsets[partitionKey{ + partition: o.partition, + topic: o.topic, + }] = o.offset + return + } b.latestCommitOffsets[partitionConsumerKey{ partition: o.partition, group: o.group, @@ -279,12 +292,32 @@ func (p *Processor) addKafkaOffset(o kafkaOffset) { }] = o.offset } +func (p *Processor) processInput(in *processorInput) { + atomic.AddInt64(&p.stats.payloadsIn, 1) + if in.typ == pointTypeStats { + p.add(in.point) + } else if in.typ == pointTypeKafkaOffset { + p.addKafkaOffset(in.kafkaOffset) + } +} + +func (p *Processor) flushInput() { + for { + in := p.in.pop() + if in == nil { + return + } + p.processInput(in) + } +} + func (p *Processor) run(tick <-chan time.Time) { for { select { case now := <-tick: p.sendToAgent(p.flush(now)) case done := <-p.flushRequest: + p.flushInput() p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) close(done) case <-p.stop: @@ -297,12 +330,7 @@ func (p *Processor) run(tick <-chan time.Time) { time.Sleep(time.Millisecond * 10) continue } - atomic.AddInt64(&p.stats.payloadsIn, 1) - if s.typ == pointTypeStats { - p.add(s.point) - } else if s.typ == pointTypeKafkaOffset { - p.addKafkaOffset(s.kafkaOffset) - } + p.processInput(s) } } } @@ -464,6 +492,19 @@ func (p *Processor) TrackKafkaProduceOffset(topic string, partition int32, offse } } +func (p *Processor) TrackKafkaHighWatermarkOffset(cluster string, topic string, partition int32, offset int64) { + dropped := p.in.push(&processorInput{typ: pointTypeKafkaOffset, kafkaOffset: kafkaOffset{ + offset: offset, + topic: topic, + partition: partition, + offsetType: highWatermarkOffset, + timestamp: p.time().UnixNano(), + }}) + if dropped { + atomic.AddInt64(&p.stats.dropped, 1) + } +} + func (p *Processor) runLoadAgentFeatures(tick <-chan time.Time) { for { select {