Skip to content

Commit

Permalink
datastreams: Port data-streams-go to dd-trace-go
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed May 26, 2023
1 parent 638d6fd commit c1fac6c
Show file tree
Hide file tree
Showing 21 changed files with 3,097 additions and 0 deletions.
36 changes: 36 additions & 0 deletions contrib/Shopify/sarama/data_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"context"

"github.com/Shopify/sarama"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
)

func TraceKafkaProduce(ctx context.Context, msg *sarama.ProducerMessage) context.Context {
edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"}
p, ctx := datastreams.SetCheckpoint(ctx, edges...)
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(datastreams.PropagationKey), Value: p.Encode()})
return ctx
}

func TraceKafkaConsume(ctx context.Context, msg *sarama.ConsumerMessage, group string) context.Context {
for _, header := range msg.Headers {
if header != nil && string(header.Key) == datastreams.PropagationKey {
p, err := datastreams.Decode(header.Value)
if err == nil {
ctx = datastreams.ContextWithPathway(ctx, p)
}
break
}
}
edges := []string{"direction:in", "group:" + group, "topic:" + msg.Topic, "type:kafka"}
_, ctx = datastreams.SetCheckpoint(ctx, edges...)
return ctx
}
93 changes: 93 additions & 0 deletions contrib/Shopify/sarama/data_streams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"context"
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
)

func TestTraceKafkaProduce(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be propagated to kafka headers", func(t *testing.T) {
ctx := context.Background()
initialPathway := datastreams.NewPathway()
ctx = datastreams.ContextWithPathway(ctx, initialPathway)

msg := sarama.ProducerMessage{
Topic: "test",
}

ctx = TraceKafkaProduce(ctx, &msg)

// The old pathway shouldn't be equal to the new pathway found in the ctx because we created a new checkpoint.
ctxPathway, _ := datastreams.PathwayFromContext(ctx)
assertPathwayNotEqual(t, initialPathway, ctxPathway)

// The decoded pathway found in the kafka headers should be the same as the pathway found in the ctx.
var encodedPathway []byte
for _, header := range msg.Headers {
if string(header.Key) == datastreams.PropagationKey {
encodedPathway = header.Value
}
}
headersPathway, _ := datastreams.Decode(encodedPathway)
assertPathwayEqual(t, ctxPathway, headersPathway)
})
}

func TestTraceKafkaConsume(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be extracted from kafka headers into context", func(t *testing.T) {
// First, set up pathway and context as it would have been from the producer view.
producerCtx := context.Background()
initialPathway := datastreams.NewPathway()
producerCtx = datastreams.ContextWithPathway(producerCtx, initialPathway)

topic := "my-topic"
produceMessage := sarama.ProducerMessage{
Topic: topic,
}
producerCtx = TraceKafkaProduce(producerCtx, &produceMessage)
consumeMessage := sarama.ConsumerMessage{
Topic: topic,
}
for _, header := range produceMessage.Headers {
consumeMessage.Headers = append(consumeMessage.Headers, &sarama.RecordHeader{Key: header.Key, Value: header.Value})
}
// Calls TraceKafkaConsume
group := "my-consumer-group"
consumerCtx := context.Background()
consumerCtx = TraceKafkaConsume(consumerCtx, &consumeMessage, group)

// Check that the resulting consumerCtx contains an expected pathway.
consumerCtxPathway, _ := datastreams.PathwayFromContext(consumerCtx)
_, expectedCtx := datastreams.SetCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka")
expectedCtxPathway, _ := datastreams.PathwayFromContext(expectedCtx)
assertPathwayEqual(t, expectedCtxPathway, consumerCtxPathway)
})
}

func assertPathwayNotEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
assert.NotEqual(t, decodedP1, decodedP2)
}

func assertPathwayEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
assert.Equal(t, decodedP1, decodedP2)
}
44 changes: 44 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/data_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"

"github.com/confluentinc/confluent-kafka-go/kafka"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
)

func TraceKafkaProduce(ctx context.Context, msg *kafka.Message) context.Context {
edges := []string{"direction:out"}
if msg.TopicPartition.Topic != nil {
edges = append(edges, "topic:"+*msg.TopicPartition.Topic)
}
edges = append(edges, "type:kafka")
p, ctx := datastreams.SetCheckpoint(ctx, edges...)
msg.Headers = append(msg.Headers, kafka.Header{Key: datastreams.PropagationKey, Value: p.Encode()})
return ctx
}

func TraceKafkaConsume(ctx context.Context, msg *kafka.Message, group string) context.Context {
for _, header := range msg.Headers {
if header.Key == datastreams.PropagationKey {
p, err := datastreams.Decode(header.Value)
if err == nil {
ctx = datastreams.ContextWithPathway(ctx, p)
}
}
}
edges := []string{"direction:in", "group:" + group}
if msg.TopicPartition.Topic != nil {
edges = append(edges, "topic:"+*msg.TopicPartition.Topic)
}
edges = append(edges, "type:kafka")
edges = append(edges)
_, ctx = datastreams.SetCheckpoint(ctx, edges...)
return ctx
}
94 changes: 94 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/data_streams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"
"fmt"
"testing"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
)

func TestTraceKafkaConsume(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be extracted from kafka headers into context", func(t *testing.T) {
// First, set up pathway and context as it would have been from the producer view.
producerCtx := context.Background()
initialPathway := datastreams.NewPathway()
producerCtx = datastreams.ContextWithPathway(producerCtx, initialPathway)

topic := "my-topic"
msg := kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
}
producerCtx = TraceKafkaProduce(producerCtx, &msg)

// Calls TraceKafkaConsume
group := "my-consumer-group"
consumerCtx := context.Background()
fmt.Println("tracking")
consumerCtx = TraceKafkaConsume(consumerCtx, &msg, group)

// Check that the resulting consumerCtx contains an expected pathway.
consumerCtxPathway, _ := datastreams.PathwayFromContext(consumerCtx)
fmt.Println("setting")
_, expectedCtx := datastreams.SetCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka")
expectedCtxPathway, _ := datastreams.PathwayFromContext(expectedCtx)
assertPathwayEqual(t, expectedCtxPathway, consumerCtxPathway)
})
}

func TestTraceKafkaProduce(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be propagated to kafka headers", func(t *testing.T) {
ctx := context.Background()
initialPathway := datastreams.NewPathway()
ctx = datastreams.ContextWithPathway(ctx, initialPathway)

msg := kafka.Message{
TopicPartition: kafka.TopicPartition{},
Value: []byte{},
}

ctx = TraceKafkaProduce(ctx, &msg)

// The old pathway shouldn't be equal to the new pathway found in the ctx because we created a new checkpoint.
ctxPathway, _ := datastreams.PathwayFromContext(ctx)
assertPathwayNotEqual(t, initialPathway, ctxPathway)

// The decoded pathway found in the kafka headers should be the same as the pathway found in the ctx.
var encodedPathway []byte
for _, header := range msg.Headers {
if header.Key == datastreams.PropagationKey {
encodedPathway = header.Value
}
}
headersPathway, _ := datastreams.Decode(encodedPathway)
assertPathwayEqual(t, ctxPathway, headersPathway)
})
}

func assertPathwayNotEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
assert.NotEqual(t, decodedP1, decodedP2)
}

func assertPathwayEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
assert.Equal(t, decodedP1, decodedP2)
}
Loading

0 comments on commit c1fac6c

Please sign in to comment.