Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib: Adding WithCustomTag to various integrations #1359

Merged
merged 23 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f01e184
contrib/gorm.io/gorm.v1: adding WithCustomTag
xdu-opendoor May 25, 2022
f8c0559
contrib/confluentinc/confluent-kafka-go/kafka: adding WithCustomTag
xdu-opendoor Jun 16, 2022
97b98bd
contrib/database/sql: adding WithCustomTag
xdu-opendoor Jun 16, 2022
ce2377c
contrib/google.golang.org/grpc: adding WithCustomTag
xdu-opendoor Jun 16, 2022
44f1af4
Merge branch 'main' into v1
duxing Jun 16, 2022
b34c1ef
fixing lint
xdu-opendoor Jun 20, 2022
7791857
Merge branch 'main' into v1
duxing Jun 20, 2022
fa1a32d
adding unit tests
xdu-opendoor Jun 21, 2022
be8d52d
Merge branch 'v1' of github.com:duxing/dd-trace-go into v1
xdu-opendoor Jun 21, 2022
d3e6207
fixing tests for integration environment
xdu-opendoor Jun 21, 2022
52921ec
adding WithChildSpanOnly to gorm.io test
xdu-opendoor Jun 21, 2022
ba1c464
correcting typo
xdu-opendoor Jun 21, 2022
0cfc41f
change test condition
xdu-opendoor Jun 21, 2022
3d28837
Merge branch 'DataDog:main' into v1
duxing Jun 27, 2022
4f31706
Merge branch 'main' into duxing/WithCustomTag
ajgajg1134 Jun 28, 2022
4be53e6
address feedback
xdu-opendoor Jun 29, 2022
7830af8
Merge remote-tracking branch 'duxing/v1' into duxing/WithCustomTag
ajgajg1134 Jun 29, 2022
1e2d3f9
Merge branch 'main' into duxing/WithCustomTag
ajgajg1134 Jun 29, 2022
bbd4673
addressing feedback
xdu-opendoor Jul 5, 2022
a4a48ed
Merge branch 'DataDog:main' into v1
duxing Jul 5, 2022
37fef06
fixing lint
xdu-opendoor Jul 5, 2022
b4a432a
Merge remote-tracking branch 'duxing/v1' into duxing/WithCustomTag
ajgajg1134 Jul 14, 2022
2847535
Merge branch 'main' into duxing/WithCustomTag
ajgajg1134 Aug 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span {
tracer.Tag("offset", msg.TopicPartition.Offset),
tracer.Measured(),
}
if c.cfg.tagFns != nil {
for key, tagFn := range c.cfg.tagFns {
opts = append(opts, tracer.Tag(key, tagFn(msg)))
}
}
if !math.IsNaN(c.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, c.cfg.analyticsRate))
}
Expand Down
46 changes: 46 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,49 @@ func TestDeprecatedContext(t *testing.T) {
<-c.Events()

}

func TestCustomTags(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

c, err := NewConsumer(&kafka.ConfigMap{
"go.events.channel.enable": true, // required for the events channel to be turned on
"group.id": testGroupID,
"socket.timeout.ms": 10,
"session.timeout.ms": 10,
"enable.auto.offset.store": false,
}, WithCustomTag("foo", func(msg *kafka.Message) interface{} {
return "bar"
}), WithCustomTag("key", func(msg *kafka.Message) interface{} {
return msg.Key
}))
assert.NoError(t, err)

err = c.Subscribe(testTopic, nil)
assert.NoError(t, err)

go func() {
c.Consumer.Events() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 1,
Offset: 1,
},
Key: []byte("key1"),
Value: []byte("value1"),
}
}()

<-c.Events()

c.Close()
// wait for the events channel to be closed
<-c.Events()

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
s := spans[0]

assert.Equal(t, "bar", s.Tag("foo"))
assert.Equal(t, []byte("key1"), s.Tag("key"))
}
14 changes: 14 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

type config struct {
ctx context.Context
consumerServiceName string
producerServiceName string
analyticsRate float64
tagFns map[string]func(msg *kafka.Message) interface{}
}

// An Option customizes the config.
Expand Down Expand Up @@ -82,3 +85,14 @@ func WithAnalyticsRate(rate float64) Option {
}
}
}

// WithCustomTag will cause the given tagFn to be evaluated after executing
// a query and attach the result to the span tagged by the key.
func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option {
gbbr marked this conversation as resolved.
Show resolved Hide resolved
return func(cfg *config) {
if cfg.tagFns == nil {
cfg.tagFns = make(map[string]func(msg *kafka.Message) interface{})
}
cfg.tagFns[tag] = tagFn
}
}
5 changes: 5 additions & 0 deletions contrib/database/sql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ func (tp *traceParams) tryTrace(ctx context.Context, qtype queryType, query stri
tracer.SpanType(ext.SpanTypeSQL),
tracer.StartTime(startTime),
)
if tp.cfg.tags != nil {
for key, tag := range tp.cfg.tags {
opts = append(opts, tracer.Tag(key, tag))
}
}
if !math.IsNaN(tp.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, tp.cfg.analyticsRate))
}
Expand Down
91 changes: 91 additions & 0 deletions contrib/database/sql/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,94 @@ func TestWithChildSpansOnly(t *testing.T) {
})
}
}

func TestWithCustomTag(t *testing.T) {
type sqlRegister struct {
name string
dsn string
driver driver.Driver
}
type want struct {
opName string
customTags map[string]interface{}
}
testcases := []struct {
name string
sqlRegister sqlRegister
want want
options []Option
}{
{
name: "mysql",
sqlRegister: sqlRegister{
name: "mysql",
dsn: "test:test@tcp(127.0.0.1:3306)/test",
driver: &mysql.MySQLDriver{},
},
want: want{
opName: "mysql.query",
customTags: map[string]interface{}{
"foo": "bar",
"baz": 123,
},
},
options: []Option{
WithCustomTag("foo", "bar"),
WithCustomTag("baz", 123),
},
},
{
name: "postgres",
sqlRegister: sqlRegister{
name: "postgres",
dsn: "postgres://postgres:postgres@127.0.0.1:5432/postgres?sslmode=disable",
driver: &pq.Driver{},
},
want: want{
opName: "postgres.query",
customTags: map[string]interface{}{
"foo": "bar",
"baz": 123,
},
},
options: []Option{
WithCustomTag("foo", "bar"),
WithCustomTag("baz", 123),
},
},
}
mt := mocktracer.Start()
defer mt.Stop()
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
Register(tt.sqlRegister.name, tt.sqlRegister.driver, tt.options...)
defer unregister(tt.sqlRegister.name)
db, err := Open(tt.sqlRegister.name, tt.sqlRegister.dsn, tt.options...)
if err != nil {
log.Fatal(err)
}
defer db.Close()
mt.Reset()

rows, err := db.QueryContext(context.Background(), "SELECT 1")
assert.NoError(t, err)
rows.Close()

spans := mt.FinishedSpans()
assert.Len(t, spans, 2)

connectSpan := spans[0]
assert.Equal(t, tt.want.opName, connectSpan.OperationName())
assert.Equal(t, "Connect", connectSpan.Tag("sql.query_type"))
for k, v := range tt.want.customTags {
assert.Equal(t, v, connectSpan.Tag(k), "Value mismatch on tag %s", k)
}

span := spans[1]
assert.Equal(t, tt.want.opName, span.OperationName())
for k, v := range tt.want.customTags {
assert.Equal(t, v, span.Tag(k), "Value mismatch on tag %s", k)
}
})
}
}
11 changes: 11 additions & 0 deletions contrib/database/sql/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type config struct {
analyticsRate float64
dsn string
childSpansOnly bool
tags map[string]interface{}
commentInjectionMode tracer.SQLCommentInjectionMode
}

Expand Down Expand Up @@ -88,6 +89,16 @@ func WithChildSpansOnly() Option {
}
}

// WithCustomTag will attach the value to the span tagged by the key
func WithCustomTag(key string, value interface{}) Option {
gbbr marked this conversation as resolved.
Show resolved Hide resolved
return func(cfg *config) {
if cfg.tags == nil {
cfg.tags = make(map[string]interface{})
}
cfg.tags[key] = value
}
}

// WithSQLCommentInjection enables injection of tags as sql comments on traced queries.
// This includes dynamic values like span id, trace id and sampling priority which can make queries
// unique for some cache implementations. Use WithStaticTagsCommentInjection if this is a concern.
Expand Down
6 changes: 3 additions & 3 deletions contrib/google.golang.org/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.method,
"grpc.message",
cs.cfg.clientServiceName(),
tracer.AnalyticsRate(cs.cfg.analyticsRate),
cs.cfg.startSpanOptions()...,
)
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
Expand All @@ -57,7 +57,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
cs.method,
"grpc.message",
cs.cfg.clientServiceName(),
tracer.AnalyticsRate(cs.cfg.analyticsRate),
cs.cfg.startSpanOptions()...,
)
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
Expand Down Expand Up @@ -170,7 +170,7 @@ func doClientRequest(
method,
"grpc.client",
cfg.clientServiceName(),
tracer.AnalyticsRate(cfg.analyticsRate),
cfg.startSpanOptions()...,
)
if methodKind != "" {
span.SetTag(tagMethodKind, methodKind)
Expand Down
17 changes: 17 additions & 0 deletions contrib/google.golang.org/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package grpc // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.or

import (
"io"
"math"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/internal/grpcutil"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
Expand All @@ -22,6 +23,22 @@ import (
"google.golang.org/grpc/status"
)

func (cfg *config) startSpanOptions(opts ...tracer.StartSpanOption) []tracer.StartSpanOption {
if len(cfg.tags) == 0 && math.IsNaN(cfg.analyticsRate) {
return opts
}

ret := make([]tracer.StartSpanOption, 0, 1+len(cfg.tags)+len(opts))
ret = append(ret, tracer.AnalyticsRate(cfg.analyticsRate))
for _, opt := range opts {
ret = append(ret, opt)
}
for key, tag := range cfg.tags {
ret = append(ret, tracer.Tag(key, tag))
}
return ret
}

func startSpanFromContext(
ctx context.Context, method, operation, service string, opts ...tracer.StartSpanOption,
) (ddtrace.Span, context.Context) {
Expand Down
36 changes: 36 additions & 0 deletions contrib/google.golang.org/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,42 @@ func TestIgnoredMetadata(t *testing.T) {
}
}

func TestCustomTag(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
for _, c := range []struct {
key string
value interface{}
}{
{key: "foo", value: "bar"},
{key: "val", value: 123},
} {
rig, err := newRig(true, WithCustomTag(c.key, c.value))
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}
ctx := context.Background()
span, ctx := tracer.StartSpanFromContext(ctx, "x", tracer.ServiceName("y"), tracer.ResourceName("z"))
rig.client.Ping(ctx, &FixtureRequest{Name: "pass"})
span.Finish()

spans := mt.FinishedSpans()

var serverSpan mocktracer.Span
for _, s := range spans {
switch s.OperationName() {
case "grpc.server":
serverSpan = s
}
}

assert.NotNil(t, serverSpan)
assert.Equal(t, c.value, serverSpan.Tag(c.key))
rig.Close()
mt.Reset()
}
}

func BenchmarkUnaryServerInterceptor(b *testing.B) {
// need to use the real tracer to get representative measurments
tracer.Start(tracer.WithLogger(log.DiscardLogger{}),
Expand Down
11 changes: 11 additions & 0 deletions contrib/google.golang.org/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type config struct {
withMetadataTags bool
ignoredMetadata map[string]struct{}
withRequestTags bool
tags map[string]interface{}
}

func (cfg *config) serverServiceName() string {
Expand Down Expand Up @@ -171,3 +172,13 @@ func WithRequestTags() Option {
cfg.withRequestTags = true
}
}

// WithCustomTag will attach the value to the span tagged by the key.
func WithCustomTag(key string, value interface{}) Option {
return func(cfg *config) {
if cfg.tags == nil {
cfg.tags = make(map[string]interface{})
}
cfg.tags[key] = value
}
}
12 changes: 4 additions & 8 deletions contrib/google.golang.org/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.method,
"grpc.message",
ss.cfg.serverServiceName(),
tracer.AnalyticsRate(ss.cfg.analyticsRate),
tracer.Measured(),
ss.cfg.startSpanOptions(tracer.Measured())...,
)
defer func() { finishWithError(span, err, ss.cfg) }()
}
Expand All @@ -60,8 +59,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.method,
"grpc.message",
ss.cfg.serverServiceName(),
tracer.AnalyticsRate(ss.cfg.analyticsRate),
tracer.Measured(),
ss.cfg.startSpanOptions(tracer.Measured())...,
)
defer func() { finishWithError(span, err, ss.cfg) }()
}
Expand All @@ -87,8 +85,7 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
info.FullMethod,
"grpc.server",
cfg.serverServiceName(),
tracer.AnalyticsRate(cfg.analyticsRate),
tracer.Measured(),
cfg.startSpanOptions(tracer.Measured())...,
)
switch {
case info.IsServerStream && info.IsClientStream:
Expand Down Expand Up @@ -132,8 +129,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
info.FullMethod,
"grpc.server",
cfg.serverServiceName(),
tracer.AnalyticsRate(cfg.analyticsRate),
tracer.Measured(),
cfg.startSpanOptions(tracer.Measured())...,
)
span.SetTag(tagMethodKind, methodKindUnary)

Expand Down
Loading