Skip to content

Commit

Permalink
[data streams] Test kafka confluent instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Jan 18, 2024
1 parent 882cddb commit 0538d5d
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 37 deletions.
16 changes: 16 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}

out <- evt
Expand Down Expand Up @@ -176,10 +177,22 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
c.prev = c.startSpan(msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}
return evt
}

func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) {
if !dataStreamsEnabled {
return
}
for _, tp := range offsets {
if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil {
tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high)
}
}
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if c.prev != nil {
Expand All @@ -199,20 +212,23 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
func (c *Consumer) Commit() ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.Commit()
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitMessage(msg)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitOffsets(offsets)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

Expand Down
21 changes: 21 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"errors"
"os"
"sort"
"strings"
"testing"
"time"

Expand All @@ -17,6 +19,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
internaldsm "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -76,6 +79,8 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO

msg2, err := consumerAction(c)
require.NoError(t, err)
commits, err := c.CommitMessage(msg2)
require.NoError(t, err)
assert.Equal(t, msg1.String(), msg2.String())
err = c.Close()
require.NoError(t, err)
Expand All @@ -84,6 +89,22 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO
require.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
sort.Slice(backlogs, func(i, j int) bool {
// it will sort the backlogs in the order: commit, high watermark, produce
return strings.Join(backlogs[i].Tags, "") < strings.Join(backlogs[j].Tags, "")
})
require.Len(t, commits, 1)
highWatermark := int64(commits[0].Offset)
expectedBacklogs := []internaldsm.Backlog{
{Tags: []string{"consumer_group:" + testGroupID, "partition:0", "topic:" + testTopic, "type:kafka_commit"}, Value: highWatermark},
{Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_high_watermark"}, Value: highWatermark},
{Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_produce"}, Value: highWatermark - 1},
}
assert.Equal(t, expectedBacklogs, backlogs)
}
return spans, msg2
}

Expand Down
16 changes: 16 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}

out <- evt
Expand Down Expand Up @@ -176,10 +177,22 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
c.prev = c.startSpan(msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}
return evt
}

func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) {
if !dataStreamsEnabled {
return
}
for _, tp := range offsets {
if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil {
tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high)
}
}
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if c.prev != nil {
Expand All @@ -199,20 +212,23 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
func (c *Consumer) Commit() ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.Commit()
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitMessage(msg)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitOffsets(offsets)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

Expand Down
21 changes: 21 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"errors"
"os"
"sort"
"strings"
"testing"
"time"

Expand All @@ -17,6 +19,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
internaldsm "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -76,6 +79,8 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO

msg2, err := consumerAction(c)
require.NoError(t, err)
commits, err := c.CommitMessage(msg2)
require.NoError(t, err)
assert.Equal(t, msg1.String(), msg2.String())
err = c.Close()
require.NoError(t, err)
Expand All @@ -84,6 +89,22 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO
require.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
sort.Slice(backlogs, func(i, j int) bool {
// it will sort the backlogs in the order: commit, high watermark, produce
return strings.Join(backlogs[i].Tags, "") < strings.Join(backlogs[j].Tags, "")
})
require.Len(t, commits, 1)
highWatermark := int64(commits[0].Offset)
expectedBacklogs := []internaldsm.Backlog{
{Tags: []string{"consumer_group:" + testGroupID, "partition:0", "topic:" + testTopic, "type:kafka_commit"}, Value: highWatermark},
{Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_high_watermark"}, Value: highWatermark},
{Tags: []string{"partition:0", "topic:" + testTopic, "type:kafka_produce"}, Value: highWatermark - 1},
}
assert.Equal(t, expectedBacklogs, backlogs)
}
return spans, msg2
}

Expand Down
45 changes: 45 additions & 0 deletions ddtrace/mocktracer/data_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package mocktracer

import (
"compress/gzip"
"net/http"

"github.com/tinylib/msgp/msgp"

"gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams"
)

type mockDSMTransport struct {
backlogs []datastreams.Backlog
}

// RoundTrip does nothing and returns a dummy response.
func (t *mockDSMTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// You can customize the dummy response if needed.
gzipReader, err := gzip.NewReader(req.Body)
if err != nil {
return nil, err
}
var p datastreams.StatsPayload
err = msgp.Decode(gzipReader, &p)
if err != nil {
return nil, err
}
for _, bucket := range p.Stats {
t.backlogs = append(t.backlogs, bucket.Backlogs...)
}
return &http.Response{
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Request: req,
ContentLength: -1,
Body: http.NoBody,
}, nil
}
36 changes: 16 additions & 20 deletions ddtrace/mocktracer/mocktracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Tracer interface {

// FinishedSpans returns the set of finished spans.
FinishedSpans() []Span
SentDSMBacklogs() []datastreams.Backlog

// Reset resets the spans and services recorded in the tracer. This is
// especially useful when running tests in a loop, where a clean start
Expand All @@ -63,11 +64,25 @@ type mocktracer struct {
sync.RWMutex // guards below spans
finishedSpans []Span
openSpans map[uint64]Span
dsmTransport *mockDSMTransport
dsmProcessor *datastreams.Processor
}

func (t *mocktracer) SentDSMBacklogs() []datastreams.Backlog {
t.dsmProcessor.Flush()
return t.dsmTransport.backlogs
}

func newMockTracer() *mocktracer {
var t mocktracer
t.openSpans = make(map[uint64]Span)
t.dsmTransport = &mockDSMTransport{}
client := &http.Client{
Transport: t.dsmTransport,
}
t.dsmProcessor = datastreams.NewProcessor(&statsd.NoOpClient{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client, func() bool { return true })
t.dsmProcessor.Start()
t.dsmProcessor.Flush()
return &t
}

Expand All @@ -91,27 +106,8 @@ func (t *mocktracer) StartSpan(operationName string, opts ...ddtrace.StartSpanOp
return span
}

type noOpTransport struct{}

// RoundTrip does nothing and returns a dummy response.
func (t *noOpTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// You can customize the dummy response if needed.
return &http.Response{
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Request: req,
ContentLength: -1,
Body: http.NoBody,
}, nil
}

func (t *mocktracer) GetDataStreamsProcessor() *datastreams.Processor {
client := &http.Client{
Transport: &noOpTransport{},
}
return datastreams.NewProcessor(&statsd.NoOpClient{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client, func() bool { return true })
return t.dsmProcessor
}

func (t *mocktracer) OpenSpans() []Span {
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/tracer/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,13 @@ func TrackKafkaProduceOffset(topic string, partition int32, offset int64) {
}
}
}

// TrackKafkaHighWatermarkOffset should be used in the producer, to track when it produces a message.
// if used together with TrackKafkaCommitOffset it can generate a Kafka lag in seconds metric.
func TrackKafkaHighWatermarkOffset(cluster string, topic string, partition int32, offset int64) {
if t, ok := internal.GetGlobalTracer().(dataStreamsContainer); ok {
if p := t.GetDataStreamsProcessor(); p != nil {
p.TrackKafkaHighWatermarkOffset(cluster, topic, partition, offset)
}
}
}

0 comments on commit 0538d5d

Please sign in to comment.