Skip to content

Commit

Permalink
contrib: Adding WithCustomTag to various integrations (#1359)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajgajg1134 authored Aug 12, 2022
1 parent 8a68d59 commit 7ac0936
Show file tree
Hide file tree
Showing 14 changed files with 303 additions and 11 deletions.
5 changes: 5 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span {
tracer.Tag("offset", msg.TopicPartition.Offset),
tracer.Measured(),
}
if c.cfg.tagFns != nil {
for key, tagFn := range c.cfg.tagFns {
opts = append(opts, tracer.Tag(key, tagFn(msg)))
}
}
if !math.IsNaN(c.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, c.cfg.analyticsRate))
}
Expand Down
46 changes: 46 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,49 @@ func TestDeprecatedContext(t *testing.T) {
<-c.Events()

}

func TestCustomTags(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

c, err := NewConsumer(&kafka.ConfigMap{
"go.events.channel.enable": true, // required for the events channel to be turned on
"group.id": testGroupID,
"socket.timeout.ms": 10,
"session.timeout.ms": 10,
"enable.auto.offset.store": false,
}, WithCustomTag("foo", func(msg *kafka.Message) interface{} {
return "bar"
}), WithCustomTag("key", func(msg *kafka.Message) interface{} {
return msg.Key
}))
assert.NoError(t, err)

err = c.Subscribe(testTopic, nil)
assert.NoError(t, err)

go func() {
c.Consumer.Events() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 1,
Offset: 1,
},
Key: []byte("key1"),
Value: []byte("value1"),
}
}()

<-c.Events()

c.Close()
// wait for the events channel to be closed
<-c.Events()

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
s := spans[0]

assert.Equal(t, "bar", s.Tag("foo"))
assert.Equal(t, []byte("key1"), s.Tag("key"))
}
14 changes: 14 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

type config struct {
ctx context.Context
consumerServiceName string
producerServiceName string
analyticsRate float64
tagFns map[string]func(msg *kafka.Message) interface{}
}

// An Option customizes the config.
Expand Down Expand Up @@ -82,3 +85,14 @@ func WithAnalyticsRate(rate float64) Option {
}
}
}

// WithCustomTag will cause the given tagFn to be evaluated after executing
// a query and attach the result to the span tagged by the key.
func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option {
return func(cfg *config) {
if cfg.tagFns == nil {
cfg.tagFns = make(map[string]func(msg *kafka.Message) interface{})
}
cfg.tagFns[tag] = tagFn
}
}
5 changes: 5 additions & 0 deletions contrib/database/sql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ func (tp *traceParams) tryTrace(ctx context.Context, qtype queryType, query stri
tracer.SpanType(ext.SpanTypeSQL),
tracer.StartTime(startTime),
)
if tp.cfg.tags != nil {
for key, tag := range tp.cfg.tags {
opts = append(opts, tracer.Tag(key, tag))
}
}
if !math.IsNaN(tp.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, tp.cfg.analyticsRate))
}
Expand Down
91 changes: 91 additions & 0 deletions contrib/database/sql/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,94 @@ func TestWithChildSpansOnly(t *testing.T) {
})
}
}

func TestWithCustomTag(t *testing.T) {
type sqlRegister struct {
name string
dsn string
driver driver.Driver
}
type want struct {
opName string
customTags map[string]interface{}
}
testcases := []struct {
name string
sqlRegister sqlRegister
want want
options []Option
}{
{
name: "mysql",
sqlRegister: sqlRegister{
name: "mysql",
dsn: "test:test@tcp(127.0.0.1:3306)/test",
driver: &mysql.MySQLDriver{},
},
want: want{
opName: "mysql.query",
customTags: map[string]interface{}{
"foo": "bar",
"baz": 123,
},
},
options: []Option{
WithCustomTag("foo", "bar"),
WithCustomTag("baz", 123),
},
},
{
name: "postgres",
sqlRegister: sqlRegister{
name: "postgres",
dsn: "postgres://postgres:postgres@127.0.0.1:5432/postgres?sslmode=disable",
driver: &pq.Driver{},
},
want: want{
opName: "postgres.query",
customTags: map[string]interface{}{
"foo": "bar",
"baz": 123,
},
},
options: []Option{
WithCustomTag("foo", "bar"),
WithCustomTag("baz", 123),
},
},
}
mt := mocktracer.Start()
defer mt.Stop()
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
Register(tt.sqlRegister.name, tt.sqlRegister.driver, tt.options...)
defer unregister(tt.sqlRegister.name)
db, err := Open(tt.sqlRegister.name, tt.sqlRegister.dsn, tt.options...)
if err != nil {
log.Fatal(err)
}
defer db.Close()
mt.Reset()

rows, err := db.QueryContext(context.Background(), "SELECT 1")
assert.NoError(t, err)
rows.Close()

spans := mt.FinishedSpans()
assert.Len(t, spans, 2)

connectSpan := spans[0]
assert.Equal(t, tt.want.opName, connectSpan.OperationName())
assert.Equal(t, "Connect", connectSpan.Tag("sql.query_type"))
for k, v := range tt.want.customTags {
assert.Equal(t, v, connectSpan.Tag(k), "Value mismatch on tag %s", k)
}

span := spans[1]
assert.Equal(t, tt.want.opName, span.OperationName())
for k, v := range tt.want.customTags {
assert.Equal(t, v, span.Tag(k), "Value mismatch on tag %s", k)
}
})
}
}
11 changes: 11 additions & 0 deletions contrib/database/sql/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type config struct {
analyticsRate float64
dsn string
childSpansOnly bool
tags map[string]interface{}
commentInjectionMode tracer.SQLCommentInjectionMode
}

Expand Down Expand Up @@ -88,6 +89,16 @@ func WithChildSpansOnly() Option {
}
}

// WithCustomTag will attach the value to the span tagged by the key
func WithCustomTag(key string, value interface{}) Option {
return func(cfg *config) {
if cfg.tags == nil {
cfg.tags = make(map[string]interface{})
}
cfg.tags[key] = value
}
}

// WithSQLCommentInjection enables injection of tags as sql comments on traced queries.
// This includes dynamic values like span id, trace id and sampling priority which can make queries
// unique for some cache implementations. Use WithStaticTagsCommentInjection if this is a concern.
Expand Down
6 changes: 3 additions & 3 deletions contrib/google.golang.org/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.method,
"grpc.message",
cs.cfg.clientServiceName(),
tracer.AnalyticsRate(cs.cfg.analyticsRate),
cs.cfg.startSpanOptions()...,
)
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
Expand All @@ -57,7 +57,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
cs.method,
"grpc.message",
cs.cfg.clientServiceName(),
tracer.AnalyticsRate(cs.cfg.analyticsRate),
cs.cfg.startSpanOptions()...,
)
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
Expand Down Expand Up @@ -170,7 +170,7 @@ func doClientRequest(
method,
"grpc.client",
cfg.clientServiceName(),
tracer.AnalyticsRate(cfg.analyticsRate),
cfg.startSpanOptions()...,
)
if methodKind != "" {
span.SetTag(tagMethodKind, methodKind)
Expand Down
17 changes: 17 additions & 0 deletions contrib/google.golang.org/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package grpc // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.or

import (
"io"
"math"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/internal/grpcutil"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
Expand All @@ -25,6 +26,22 @@ import (
// cache a constant option: saves one allocation per call
var spanTypeRPC = tracer.SpanType(ext.AppTypeRPC)

func (cfg *config) startSpanOptions(opts ...tracer.StartSpanOption) []tracer.StartSpanOption {
if len(cfg.tags) == 0 && math.IsNaN(cfg.analyticsRate) {
return opts
}

ret := make([]tracer.StartSpanOption, 0, 1+len(cfg.tags)+len(opts))
ret = append(ret, tracer.AnalyticsRate(cfg.analyticsRate))
for _, opt := range opts {
ret = append(ret, opt)
}
for key, tag := range cfg.tags {
ret = append(ret, tracer.Tag(key, tag))
}
return ret
}

func startSpanFromContext(
ctx context.Context, method, operation, service string, opts ...tracer.StartSpanOption,
) (ddtrace.Span, context.Context) {
Expand Down
36 changes: 36 additions & 0 deletions contrib/google.golang.org/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,42 @@ func TestIgnoredMetadata(t *testing.T) {
}
}

func TestCustomTag(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
for _, c := range []struct {
key string
value interface{}
}{
{key: "foo", value: "bar"},
{key: "val", value: 123},
} {
rig, err := newRig(true, WithCustomTag(c.key, c.value))
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}
ctx := context.Background()
span, ctx := tracer.StartSpanFromContext(ctx, "x", tracer.ServiceName("y"), tracer.ResourceName("z"))
rig.client.Ping(ctx, &FixtureRequest{Name: "pass"})
span.Finish()

spans := mt.FinishedSpans()

var serverSpan mocktracer.Span
for _, s := range spans {
switch s.OperationName() {
case "grpc.server":
serverSpan = s
}
}

assert.NotNil(t, serverSpan)
assert.Equal(t, c.value, serverSpan.Tag(c.key))
rig.Close()
mt.Reset()
}
}

func BenchmarkUnaryServerInterceptor(b *testing.B) {
// need to use the real tracer to get representative measurments
tracer.Start(tracer.WithLogger(log.DiscardLogger{}),
Expand Down
11 changes: 11 additions & 0 deletions contrib/google.golang.org/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type config struct {
withMetadataTags bool
ignoredMetadata map[string]struct{}
withRequestTags bool
tags map[string]interface{}
}

func (cfg *config) serverServiceName() string {
Expand Down Expand Up @@ -171,3 +172,13 @@ func WithRequestTags() Option {
cfg.withRequestTags = true
}
}

// WithCustomTag will attach the value to the span tagged by the key.
func WithCustomTag(key string, value interface{}) Option {
return func(cfg *config) {
if cfg.tags == nil {
cfg.tags = make(map[string]interface{})
}
cfg.tags[key] = value
}
}
12 changes: 4 additions & 8 deletions contrib/google.golang.org/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.method,
"grpc.message",
ss.cfg.serverServiceName(),
tracer.AnalyticsRate(ss.cfg.analyticsRate),
tracer.Measured(),
ss.cfg.startSpanOptions(tracer.Measured())...,
)
defer func() { finishWithError(span, err, ss.cfg) }()
}
Expand All @@ -60,8 +59,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.method,
"grpc.message",
ss.cfg.serverServiceName(),
tracer.AnalyticsRate(ss.cfg.analyticsRate),
tracer.Measured(),
ss.cfg.startSpanOptions(tracer.Measured())...,
)
defer func() { finishWithError(span, err, ss.cfg) }()
}
Expand All @@ -87,8 +85,7 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
info.FullMethod,
"grpc.server",
cfg.serverServiceName(),
tracer.AnalyticsRate(cfg.analyticsRate),
tracer.Measured(),
cfg.startSpanOptions(tracer.Measured())...,
)
switch {
case info.IsServerStream && info.IsClientStream:
Expand Down Expand Up @@ -132,8 +129,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
info.FullMethod,
"grpc.server",
cfg.serverServiceName(),
tracer.AnalyticsRate(cfg.analyticsRate),
tracer.Measured(),
cfg.startSpanOptions(tracer.Measured())...,
)
span.SetTag(tagMethodKind, methodKindUnary)

Expand Down
Loading

0 comments on commit 7ac0936

Please sign in to comment.