Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/tracer: create debug mode for old, open spans #2149

Merged
merged 35 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
65c8223
debug open spans without generics
hannahkm Jul 24, 2023
b150b1d
typo fix to reportOpenSpans
hannahkm Jul 25, 2023
341ee11
custom linked list and tests
hannahkm Jul 28, 2023
43af0c6
fix mistake in tests
hannahkm Jul 28, 2023
c9954f5
faster testing and fixed lint
hannahkm Jul 31, 2023
0990b67
change log level to warn
hannahkm Aug 1, 2023
7312db9
bucketed linked list implementation - WIP
hannahkm Aug 2, 2023
e029583
Merge branch 'main' into hannahkm/debug-open-spans
hannahkm Aug 2, 2023
a71e1b9
fix error while merging
hannahkm Aug 2, 2023
2db154f
improve bucketed linked list
hannahkm Aug 3, 2023
dd9a565
enable printing all open spans
hannahkm Aug 4, 2023
9143c09
Merge branch 'main' into hannahkm/debug-open-spans
hannahkm Aug 4, 2023
d4dc06a
remove extra comments
hannahkm Aug 4, 2023
da608da
fix cilint
hannahkm Aug 4, 2023
cebcaaf
resolve data race in testing
hannahkm Aug 4, 2023
80978da
remove locks from code
hannahkm Aug 4, 2023
870b76b
remove generics and improve readability
hannahkm Aug 7, 2023
296a77a
rename file to match contents
hannahkm Aug 7, 2023
9bea3b0
update docs to represent new functionality
hannahkm Aug 7, 2023
5d723db
remove public print
hannahkm Aug 7, 2023
b38206f
list unit tests and less now() calls
hannahkm Aug 8, 2023
388c9cf
Merge branch 'main' into hannahkm/debug-open-spans
hannahkm Aug 8, 2023
ea79dfb
switch to container/list
hannahkm Aug 11, 2023
46c0424
truncate abandoned span logs
hannahkm Aug 11, 2023
7e2017b
clean up typos
hannahkm Aug 11, 2023
a1eecee
resolve data race while printing spans
hannahkm Aug 11, 2023
5364a94
new string methods
hannahkm Aug 14, 2023
8329bf6
fixed missing duration check
hannahkm Aug 14, 2023
8d7a057
clarify abandoned span logs
hannahkm Aug 15, 2023
a6d10d9
improved readability and comments
hannahkm Aug 16, 2023
9b4c7d6
Merge branch 'main' into hannahkm/debug-open-spans
hannahkm Aug 17, 2023
9e1e417
add tests for completeness
hannahkm Aug 17, 2023
0eff316
add organized struct for abandoned spans
hannahkm Aug 18, 2023
7c0f935
remove commented test
hannahkm Aug 18, 2023
2d7bb8f
Merge branch 'main' into hannahkm/debug-open-spans
ajgajg1134 Aug 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions ddtrace/tracer/abandoned_spans.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracer

import (
"container/list"
"fmt"
"strings"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

var tickerInterval = time.Minute
var logSize = 9000

func isBucketNode(e *list.Element) (*list.List, bool) {
hannahkm marked this conversation as resolved.
Show resolved Hide resolved
ls, ok := e.Value.(*list.List)
if !ok || ls == nil || ls.Front() == nil {
return nil, false
}
return ls, true
}

func isSpanNode(e *list.Element) (*span, bool) {
s, ok := e.Value.(*span)
if !ok || s == nil {
return nil, false
}
return s, true
}

// reportAbandonedSpans periodically finds and reports potentially abandoned
// spans that are older than the given interval. These spans are stored in a
// bucketed linked list, sorted by their `Start` time, where the front of the
// list contains the oldest spans, and the end of the list contains the newest spans.
func (t *tracer) reportAbandonedSpans(interval time.Duration) {
tick := time.NewTicker(tickerInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
logAbandonedSpans(t.abandonedSpans, &interval)
case s := <-t.cIn:
bNode := t.abandonedSpans.Front()
if bNode == nil {
b := list.New()
b.PushBack(s)
t.abandonedSpans.PushBack(b)
break
}
// All spans within the same bucket should have a start time that
// is within `interval` nanoseconds of each other.
// This loop should continue until the correct bucket is found. This
// includes empty or nil buckets (which have no spans in them) or
// an existing bucket with spans that have started within `interval`
// nanoseconds before the new span has started.
for bNode != nil {
bucket, bOk := isBucketNode(bNode)
if !bOk {
bNode = bNode.Next()
continue
}
sNode := bucket.Front()
if sNode == nil {
bNode = bNode.Next()
continue
}
sp, sOk := isSpanNode(sNode)
if sOk && s.Start-sp.Start <= interval.Nanoseconds() {
bucket.PushBack(s)
break
}
bNode = bNode.Next()
}
if bNode != nil {
break
}
// If no matching bucket exists, create a new one and append the new
// span to the top of the bucket.
b := list.New()
hannahkm marked this conversation as resolved.
Show resolved Hide resolved
b.PushBack(s)
t.abandonedSpans.PushBack(b)
case s := <-t.cOut:
// This loop should continue until it finds the bucket with spans
// starting within `interval` nanoseconds of the finished span,
// then remove that span from the bucket.
for node := t.abandonedSpans.Front(); node != nil; node = node.Next() {
hannahkm marked this conversation as resolved.
Show resolved Hide resolved
bucket, ok := isBucketNode(node)
if !ok {
continue
}
spNode := bucket.Front()
sp, ok := isSpanNode(spNode)
if !ok {
continue
}
if s.Start-sp.Start <= interval.Nanoseconds() {
bucket.Remove(spNode)
if bucket.Front() == nil {
t.abandonedSpans.Remove(node)
}
break
}
}
case <-t.stop:
logAbandonedSpans(t.abandonedSpans, nil)
return
}
}
}

func abandonedSpanString(s *span, interval *time.Duration) string {

Check failure on line 116 in ddtrace/tracer/abandoned_spans.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / lint

unused-parameter: parameter 'interval' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 116 in ddtrace/tracer/abandoned_spans.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / lint

unused-parameter: parameter 'interval' seems to be unused, consider removing or renaming it as _ (revive)
hannahkm marked this conversation as resolved.
Show resolved Hide resolved
s.Lock()
defer s.Unlock()
return fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %d],", s.Name, s.SpanID, s.TraceID, s.Duration)
}

func abandonedBucketString(bucket *list.List, interval *time.Duration) (int, string) {
var sb strings.Builder
spanCount := 0
node := bucket.Back()
span, ok := isSpanNode(node)
filter := ok && interval != nil && now()-span.Start <= interval.Nanoseconds()
for node := bucket.Front(); node != nil; node = node.Next() {
span, ok := isSpanNode(node)
if !ok {
continue
}
var msg string
if filter {
msg = abandonedSpanString(span, interval)
} else {
msg = abandonedSpanString(span, nil)
}
sb.WriteString(msg)
spanCount++
}
return spanCount, sb.String()
}

// logAbandonedSpans returns a string containing potentially abandoned spans. If `filter` is true,
// it will only return spans that are older than the provided time `interval`. If false,
// it will return all unfinished spans.
func logAbandonedSpans(l *list.List, interval *time.Duration) {
darccio marked this conversation as resolved.
Show resolved Hide resolved
var sb strings.Builder
nowTime := now()
spanCount := 0
truncated := false

for bucketNode := l.Front(); bucketNode != nil; bucketNode = bucketNode.Next() {
bucket, ok := isBucketNode(bucketNode)
if !ok {
continue
}

// since spans are bucketed by time, finding a bucket that is newer
// than the allowed time interval means that all spans in this bucket
// and future buckets will be younger than `interval`, and thus aren't
// worth checking.
if interval != nil {
spanNode := bucket.Front()
sp, ok := isSpanNode(spanNode)
if !ok {
continue
}
if nowTime-sp.Start < interval.Nanoseconds() {
continue
}
hannahkm marked this conversation as resolved.
Show resolved Hide resolved
}
if truncated {
continue
}
nSpans, msg := abandonedBucketString(bucket, interval)
spanCount += nSpans
space := logSize - len(sb.String())
if len(msg) > space {
msg = msg[0:space]
truncated = true
}
sb.WriteString(msg)
}

if spanCount == 0 {
return
}

log.Warn("%d abandoned spans:", spanCount)
if truncated {
log.Warn("Too many abandoned spans. Truncating message.")
sb.WriteString("...")
}
log.Warn(sb.String())
}
173 changes: 173 additions & 0 deletions ddtrace/tracer/abandoned_spans_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracer

import (
"fmt"
"strings"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"

"github.com/stretchr/testify/assert"
)

var warnPrefix = fmt.Sprintf("Datadog Tracer %v WARN: ", version.Tag)

func TestReportAbandonedSpans(t *testing.T) {
assert := assert.New(t)
tp := new(log.RecordLogger)

tickerInterval = 100 * time.Millisecond

tp.Ignore("appsec: ", telemetry.LogPrefix)

t.Run("on", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
defer stop()
assert.True(tracer.config.debugAbandonedSpans)
assert.Equal(tracer.config.spanTimeout, 500*time.Millisecond)
})

t.Run("finished", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
defer stop()
s := tracer.StartSpan("operation").(*span)
s.Finish()
expected := fmt.Sprintf("%s[name: %s, span_id: %d, trace_id: %d, age: %d],", warnPrefix, s.Name, s.SpanID, s.TraceID, s.Duration)
assert.NotContains(tp.Logs(), expected)
})

t.Run("open", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
defer stop()
s := tracer.StartSpan("operation").(*span)
time.Sleep(time.Second)
expected := fmt.Sprintf("%s[name: %s, span_id: %d, trace_id: %d, age: %d],", warnPrefix, s.Name, s.SpanID, s.TraceID, s.Duration)
assert.Contains(tp.Logs(), fmt.Sprintf("%s%d abandoned spans:", warnPrefix, 1))
assert.Contains(tp.Logs(), expected)
s.Finish()
})

t.Run("both", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
defer stop()
sf := tracer.StartSpan("op").(*span)
sf.Finish()
s := tracer.StartSpan("op2").(*span)
time.Sleep(time.Second)
notExpected := fmt.Sprintf("%s[name: %s, span_id: %d, trace_id: %d, age: %d],[name: %s, span_id: %d, trace_id: %d, age: %d],", warnPrefix, sf.Name, sf.SpanID, sf.TraceID, sf.Duration, s.Name, s.SpanID, s.TraceID, s.Duration)
expected := fmt.Sprintf("%s[name: %s, span_id: %d, trace_id: %d, age: %d],", warnPrefix, s.Name, s.SpanID, s.TraceID, s.Duration)
assert.Contains(tp.Logs(), fmt.Sprintf("%s%d abandoned spans:", warnPrefix, 1))
assert.NotContains(tp.Logs(), notExpected)
assert.Contains(tp.Logs(), expected)
s.Finish()
})

t.Run("many", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
defer stop()
var sb strings.Builder
sb.WriteString(warnPrefix)
for i := 0; i < 10; i++ {
s := tracer.StartSpan(fmt.Sprintf("operation%d", i)).(*span)
if i%2 == 0 {
s.Finish()
} else {
e := fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %d],", s.Name, s.SpanID, s.TraceID, s.Duration)
sb.WriteString(e)
time.Sleep(500 * time.Millisecond)
}
}
time.Sleep(time.Second)
assert.Contains(tp.Logs(), sb.String())
})

t.Run("many buckets", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
defer stop()
var sb strings.Builder
sb.WriteString(warnPrefix)

for i := 0; i < 5; i++ {
s := tracer.StartSpan(fmt.Sprintf("operation%d", i))
s.Finish()
time.Sleep(150 * time.Millisecond)
}
for i := 0; i < 5; i++ {
s := tracer.StartSpan(fmt.Sprintf("operation2-%d", i)).(*span)
sb.WriteString(fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %d],", s.Name, s.SpanID, s.TraceID, s.Duration))
time.Sleep(150 * time.Millisecond)
}
time.Sleep(time.Second)

assert.Contains(tp.Logs(), fmt.Sprintf("%s%d abandoned spans:", warnPrefix, 5))
assert.Contains(tp.Logs(), sb.String())
})

t.Run("stop", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(100*time.Millisecond))
var sb strings.Builder
sb.WriteString(warnPrefix)

for i := 0; i < 5; i++ {
s := tracer.StartSpan(fmt.Sprintf("operation%d", i)).(*span)
sb.WriteString(fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %d],", s.Name, s.SpanID, s.TraceID, s.Duration))
}
stop()
time.Sleep(time.Second)
assert.Contains(tp.Logs(), fmt.Sprintf("%s%d abandoned spans:", warnPrefix, 5))
assert.Contains(tp.Logs(), sb.String())
})

t.Run("wait", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
defer stop()

s := tracer.StartSpan("operation").(*span)
expected := fmt.Sprintf("%s[name: %s, span_id: %d, trace_id: %d, age: %d],", warnPrefix, s.Name, s.SpanID, s.TraceID, s.Duration)

assert.NotContains(tp.Logs(), expected)
time.Sleep(time.Second)
assert.Contains(tp.Logs(), expected)
s.Finish()
})

t.Run("truncate", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond))
logSize = 10

s := tracer.StartSpan("operation").(*span)
msg := fmt.Sprintf("%s[name: %s, span_id: %d, trace_id: %d, age: %d],", warnPrefix, s.Name, s.SpanID, s.TraceID, s.Duration)
stop()
time.Sleep(500 * time.Millisecond)
assert.NotContains(tp.Logs(), msg)
assert.Contains(tp.Logs(), fmt.Sprintf("%sToo many abandoned spans. Truncating message.", warnPrefix))
})
}

func TestDebugAbandonedSpansOff(t *testing.T) {
assert := assert.New(t)
tp := new(log.RecordLogger)
tracer, _, _, stop := startTestTracer(t, WithLogger(tp))
defer stop()

tp.Reset()
tp.Ignore("appsec: ", telemetry.LogPrefix)

t.Run("default", func(t *testing.T) {
assert.False(tracer.config.debugAbandonedSpans)
assert.Equal(time.Duration(0), tracer.config.spanTimeout)
s := tracer.StartSpan("operation")
time.Sleep(time.Second)
expected := fmt.Sprintf("%s Trace %v waiting on span %v", warnPrefix, s.Context().TraceID(), s.Context().SpanID())
assert.NotContains(tp.Logs(), expected)
s.Finish()
})
}
Loading
Loading