Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added span links in ddtracer/tracer #2396

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
251 changes: 251 additions & 0 deletions ddtrace/ddtrace.go
Expand Up @@ -15,6 +15,7 @@ package ddtrace // import "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"

import (
"context"
"github.com/tinylib/msgp/msgp"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -70,6 +71,9 @@ type Span interface {
// item should propagate to all descendant spans, both in- and cross-process.
SetBaggageItem(key, val string)

// AddLinks sets the given set of links on the span.
AddLinks(links ...SpanLink)

// Finish finishes the current span with the given options. Finish calls should be idempotent.
Finish(opts ...FinishOption)

Expand Down Expand Up @@ -137,6 +141,10 @@ type StartSpanConfig struct {
// new span.
Tags map[string]interface{}

// Tags holds a set of key/value pairs that should be set as metadata on the
// new span.
Links []SpanLink

// SpanID will be the SpanID of the Span, overriding the random number that would
// be generated. If no Parent SpanContext is present, then this will also set the
// TraceID to the same value.
Expand All @@ -156,3 +164,246 @@ type Logger interface {
func UseLogger(l Logger) {
log.UseLogger(l)
}

type SpanLink struct {
TraceID uint64 `msg:"trace_id"`
TraceIDHigh uint64 `msg:"trace_id_high,omitempty"`
SpanID uint64 `msg:"span_id"`

Attributes map[string]interface{} `msg:"attributes,omitempty"`
Tracestate string `msg:"tracestate,omitempty"`
Flags uint32 `msg:"flags,omitempty"`

//TODO (dianashevchenko): do we need to expose dropped attributes count
DroppedAttributes int32 `msg:"dropped_attributes_count,omitempty"`
}

// DecodeMsg implements msgp.Decodable
func (z *SpanLink) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "trace_id":
z.TraceID, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "TraceID")
return
}
case "trace_id_high":
z.TraceIDHigh, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "TraceIDHigh")
return
}
case "span_id":
z.SpanID, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "SpanID")
return
}
case "attributes":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Attributes")
return
}
if z.Attributes == nil {
z.Attributes = make(map[string]interface{}, zb0002)
} else if len(z.Attributes) > 0 {
for key := range z.Attributes {
delete(z.Attributes, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 interface{}
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Attributes")
return
}
za0002, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Attributes", za0001)
return
}
z.Attributes[za0001] = za0002
}
case "tracestate":
z.Tracestate, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Tracestate")
return
}
case "flags":
z.Flags, err = dc.ReadUint32()
if err != nil {
err = msgp.WrapError(err, "Flags")
return
}
case "dropped_attributes_count":
z.DroppedAttributes, err = dc.ReadInt32()
if err != nil {
err = msgp.WrapError(err, "droppedAttributes")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}

// EncodeMsg implements msgp.Encodable
func (z *SpanLink) EncodeMsg(en *msgp.Writer) (err error) {
// omitempty: check for empty values
zb0001Len := uint32(7)
var zb0001Mask uint8 /* 7 bits */
if z.Attributes == nil {
zb0001Len--
zb0001Mask |= 0x8
}
if z.Tracestate == "" {
zb0001Len--
zb0001Mask |= 0x10
}
if z.Flags == 0 {
zb0001Len--
zb0001Mask |= 0x20
}
if z.DroppedAttributes == 0 {
zb0001Len--
zb0001Mask |= 0x40
}
// variable map header, size zb0001Len
err = en.Append(0x80 | uint8(zb0001Len))
if err != nil {
return
}
if zb0001Len == 0 {
return
}
// write "trace_id"
err = en.Append(0xa8, 0x74, 0x72, 0x61, 0x63, 0x65, 0x5f, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.TraceID)
if err != nil {
err = msgp.WrapError(err, "TraceID")
return
}
// write "trace_id_high"
err = en.Append(0xad, 0x74, 0x72, 0x61, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x5f, 0x68, 0x69, 0x67, 0x68)
if err != nil {
return
}
err = en.WriteUint64(z.TraceIDHigh)
if err != nil {
err = msgp.WrapError(err, "TraceIDHigh")
return
}
// write "span_id"
err = en.Append(0xa7, 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.SpanID)
if err != nil {
err = msgp.WrapError(err, "SpanID")
return
}
if (zb0001Mask & 0x8) == 0 { // if not empty
// write "attributes"
err = en.Append(0xaa, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.Attributes)))
if err != nil {
err = msgp.WrapError(err, "Attributes")
return
}
for za0001, za0002 := range z.Attributes {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "Attributes")
return
}
err = en.WriteIntf(za0002)
if err != nil {
err = msgp.WrapError(err, "Attributes", za0001)
return
}
}
}
if (zb0001Mask & 0x10) == 0 { // if not empty
// write "tracestate"
err = en.Append(0xaa, 0x74, 0x72, 0x61, 0x63, 0x65, 0x73, 0x74, 0x61, 0x74, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Tracestate)
if err != nil {
err = msgp.WrapError(err, "Tracestate")
return
}
}
if (zb0001Mask & 0x20) == 0 { // if not empty
// write "flags"
err = en.Append(0xa5, 0x66, 0x6c, 0x61, 0x67, 0x73)
if err != nil {
return
}
err = en.WriteUint32(z.Flags)
if err != nil {
err = msgp.WrapError(err, "Flags")
return
}
}
if (zb0001Mask & 0x40) == 0 { // if not empty
// write "dropped_attributes_count"
err = en.Append(0xb8, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteInt32(z.DroppedAttributes)
if err != nil {
err = msgp.WrapError(err, "droppedAttributes")
return
}
}
return
}

// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *SpanLink) Msgsize() (s int) {
s = 1 + 9 + msgp.Uint64Size + 14 + msgp.Uint64Size + 8 + msgp.Uint64Size + 11 + msgp.MapHeaderSize
if z.Attributes != nil {
for za0001, za0002 := range z.Attributes {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.GuessSize(za0002)
}
}
s += 11 + msgp.StringPrefixSize + len(z.Tracestate) + 6 + msgp.Uint32Size + 25 + msgp.Int32Size
return
}
2 changes: 2 additions & 0 deletions ddtrace/internal/globaltracer.go
Expand Up @@ -68,6 +68,8 @@ var _ ddtrace.Span = (*NoopSpan)(nil)
// NoopSpan is an implementation of ddtrace.Span that is a no-op.
type NoopSpan struct{}

func (s NoopSpan) AddLinks(_ ...ddtrace.SpanLink) {}

// SetTag implements ddtrace.Span.
func (NoopSpan) SetTag(_ string, _ interface{}) {}

Expand Down
8 changes: 8 additions & 0 deletions ddtrace/mocktracer/mockspan.go
Expand Up @@ -100,6 +100,7 @@ type mockspan struct {
sync.RWMutex // guards below fields
name string
tags map[string]interface{}
links []ddtrace.SpanLink
finishTime time.Time
finished bool

Expand Down Expand Up @@ -175,6 +176,13 @@ func (s *mockspan) SetOperationName(operationName string) {
return
}

func (s *mockspan) AddLinks(links ...ddtrace.SpanLink) {
for i := range links {
// todo (dianashevchenko): add link validation
s.links = append(s.links, links[i])
}
}

// BaggageItem returns the baggage item with the given key.
func (s *mockspan) BaggageItem(key string) string {
return s.context.baggageItem(key)
Expand Down
27 changes: 27 additions & 0 deletions ddtrace/opentelemetry/span_test.go
Expand Up @@ -651,3 +651,30 @@ func TestRemapWithMultipleSetAttributes(t *testing.T) {
assert.Contains(t, p, `"type":"new.span.type"`)
assert.Contains(t, p, `"_dd1.sr.eausr":1`)
}

func TestSpanLinks(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, payloads, cleanup := mockTracerProvider(t)
tr := otel.Tracer("")
defer cleanup()

_, sp := tr.Start(context.Background(), "link")
sp.End()

_, decoratedSpan := tr.Start(context.Background(), "test",
oteltrace.WithLinks(oteltrace.Link{
SpanContext: sp.SpanContext(),
Attributes: []attribute.KeyValue{attribute.String("yes", "no")},
}))

decoratedSpan.End()
tracer.Flush()
payload, err := waitForPayload(ctx, payloads)
if err != nil {
t.Fatalf(err.Error())
}
assert.Contains(payload, "\"span_links\"")
}
27 changes: 26 additions & 1 deletion ddtrace/opentelemetry/tracer.go
Expand Up @@ -50,7 +50,6 @@ func (t *oteltracer) Start(ctx context.Context, spanName string, opts ...oteltra
if k := ssConfig.SpanKind(); k != 0 {
ddopts = append(ddopts, tracer.Tag(ext.SpanKind, k.String()))
}
telemetry.GlobalClient.Count(telemetry.NamespaceTracers, "spans_created", 1.0, telemetryTags, true)
var cfg ddtrace.StartSpanConfig
cfg.Tags = make(map[string]interface{})
for _, attr := range ssConfig.Attributes() {
Expand All @@ -62,6 +61,27 @@ func (t *oteltracer) Start(ctx context.Context, spanName string, opts ...oteltra
o(&cfg)
}
}
if links := ssConfig.Links(); links != nil {
ddLinks := []ddtrace.SpanLink{}
for _, link := range links {
ctx := otelCtxToDDCtx{link.SpanContext}
attrs := map[string]interface{}{}
for _, attribute := range link.Attributes {
attrs[string(attribute.Key)] = attribute.Value.AsInterface()
}
ddlink := ddtrace.SpanLink{
TraceID: ctx.TraceID(),
TraceIDHigh: ctx.TraceIDUpper(),
SpanID: ctx.SpanID(),
Attributes: attrs,
Tracestate: link.SpanContext.TraceState().String(),
Flags: uint32(link.SpanContext.TraceFlags()),
}
ddLinks = append(ddLinks, ddlink)
}
ddopts = append(ddopts, tracer.WithSpanLinks(ddLinks))
}
telemetry.GlobalClient.Count(telemetry.NamespaceTracers, "spans_created", 1.0, telemetryTags, true)
// Since there is no way to see if and how the span operation name was set,
// we have to record the attributes locally.
// The span operation name will be calculated when it's ended.
Expand All @@ -88,6 +108,11 @@ func (c *otelCtxToDDCtx) TraceID() uint64 {
return binary.BigEndian.Uint64(id[8:])
}

func (c *otelCtxToDDCtx) TraceIDUpper() uint64 {
id := c.oc.TraceID()
return binary.BigEndian.Uint64(id[:8])
}

func (c *otelCtxToDDCtx) SpanID() uint64 {
id := c.oc.SpanID()
return binary.BigEndian.Uint64(id[:])
Expand Down
6 changes: 6 additions & 0 deletions ddtrace/tracer/option.go
Expand Up @@ -1137,6 +1137,12 @@ func SpanType(name string) StartSpanOption {
return Tag(ext.SpanType, name)
}

func WithSpanLinks(links []ddtrace.SpanLink) StartSpanOption {
return func(cfg *ddtrace.StartSpanConfig) {
cfg.Links = append(cfg.Links, links...)
}
}

var measuredTag = Tag(keyMeasured, 1)

// Measured marks this span to be measured for metrics and stats calculations.
Expand Down