Skip to content

Commit

Permalink
datastreams: Port data-streams-go to dd-trace-go (#2006)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Glaude <andrew.glaude@datadoghq.com>
  • Loading branch information
piochelepiotr and ajgajg1134 committed Sep 6, 2023
1 parent b49b27c commit 3ebb83f
Show file tree
Hide file tree
Showing 35 changed files with 3,008 additions and 81 deletions.
4 changes: 4 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
/contrib/**/*appsec*.go @DataDog/asm-go
/.github/workflows/appsec.yml @DataDog/asm-go

# datastreams
/datastreams @Datadog/data-streams-monitoring
/internal/datastreams @Datadog/data-streams-monitoring

# telemetry
/internal/telemetry @DataDog/apm-go

Expand Down
16 changes: 16 additions & 0 deletions contrib/Shopify/sarama/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type config struct {
consumerSpanName string
producerSpanName string
analyticsRate float64
dataStreamsEnabled bool
groupID string
}

func defaults(cfg *config) {
Expand Down Expand Up @@ -51,6 +53,20 @@ func WithServiceName(name string) Option {
}
}

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(cfg *config) {
cfg.dataStreamsEnabled = true
}
}

// WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group)
func WithGroupID(groupID string) Option {
return func(cfg *config) {
cfg.groupID = groupID
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(cfg *config) {
Expand Down
74 changes: 74 additions & 0 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"

import (
"context"
"math"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -76,6 +79,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
next := tracer.StartSpan(cfg.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(next.Context(), carrier)
setConsumeCheckpoint(cfg.dataStreamsEnabled, cfg.groupID, msg)

wrapped.messages <- msg

Expand Down Expand Up @@ -127,8 +131,12 @@ type syncProducer struct {
// SendMessage calls sarama.SyncProducer.SendMessage and traces the request.
func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
span := startProducerSpan(p.cfg, p.version, msg)
setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version)
partition, offset, err = p.SyncProducer.SendMessage(msg)
finishProducerSpan(span, partition, offset, err)
if err == nil && p.cfg.dataStreamsEnabled {
tracer.TrackKafkaProduceOffset(msg.Topic, partition, offset)
}
return partition, offset, err
}

Expand All @@ -138,12 +146,19 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
// treated individually, so we create a span for each one
spans := make([]ddtrace.Span, len(msgs))
for i, msg := range msgs {
setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version)
spans[i] = startProducerSpan(p.cfg, p.version, msg)
}
err := p.SyncProducer.SendMessages(msgs)
for i, span := range spans {
finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
if err == nil && p.cfg.dataStreamsEnabled {
// we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to.
for _, msg := range msgs {
tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset)
}
}
return err
}

Expand Down Expand Up @@ -221,6 +236,7 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
select {
case msg := <-wrapped.input:
span := startProducerSpan(cfg, saramaConfig.Version, msg)
setProduceCheckpoint(cfg.dataStreamsEnabled, msg, saramaConfig.Version)
p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
spanID := span.Context().SpanID()
Expand All @@ -236,6 +252,10 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
// producer was closed, so exit
return
}
if cfg.dataStreamsEnabled {
// we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to.
tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset)
}
if spanctx, spanFound := getSpanContext(msg); spanFound {
spanID := spanctx.SpanID()
if span, ok := spans[spanID]; ok {
Expand Down Expand Up @@ -303,3 +323,57 @@ func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) {

return spanctx, true
}

func setProduceCheckpoint(enabled bool, msg *sarama.ProducerMessage, version sarama.KafkaVersion) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"}
carrier := NewProducerMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, edges...)
if !ok || !version.IsAtLeast(sarama.V0_11_0_0) {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := NewConsumerMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset)
}
}

func getProducerMsgSize(msg *sarama.ProducerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
if msg.Value != nil {
size += int64(msg.Value.Length())
}
if msg.Key != nil {
size += int64(msg.Key.Length())
}
return size
}

func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
27 changes: 24 additions & 3 deletions contrib/Shopify/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestConsumer(t *testing.T) {
}
defer consumer.Close()

consumer = WrapConsumer(consumer)
consumer = WrapConsumer(consumer, WithDataStreams())

partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0)
if err != nil {
Expand Down Expand Up @@ -145,6 +146,12 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1)))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
{
s := spans[1]
Expand All @@ -162,6 +169,12 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1)))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
}

Expand All @@ -176,23 +189,25 @@ func TestSyncProducer(t *testing.T) {
defer leader.Close()

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.Version = 1
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
seedBroker.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.Version = 2
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
producer = WrapSyncProducer(cfg, producer)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Expand All @@ -214,6 +229,12 @@ func TestSyncProducer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1)))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (

// This example shows how a span context can be passed from a producer to a consumer.
func Example() {

tracer.Start()
defer tracer.Stop()

Expand All @@ -31,6 +32,7 @@ func Example() {
"session.timeout.ms": 10,
"enable.auto.offset.store": false,
})

err = c.Subscribe(testTopic, nil)
if err != nil {
panic(err)
Expand All @@ -56,6 +58,7 @@ func Example() {
tracer.Inject(parentSpan.Context(), carrier)

c.Consumer.Events() <- msg

}()

msg := (<-c.Events()).(*kafka.Message)
Expand All @@ -66,6 +69,7 @@ func Example() {
if err != nil {
panic(err)
}

parentContext := parentSpan.Context()

// Validate that the context passed is the context sent via the message
Expand Down
Loading

0 comments on commit 3ebb83f

Please sign in to comment.