Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

trace: add buffer limit #824

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions trace/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,11 @@ type SpanData struct {
Status
Links []Link
HasRemoteParent bool

// The count of all span.bufferLimit overflows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

golint your code.

// DroppedAnnotations represents all the annotations dropped
// due to the annotations limit.
DroppedAnnotations int

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I'm a Xoogler that misses the Critique golint badges.

// See trace.WithBufferLimit for more info.
// TODO: This is not currently exported anywhere (e.g. tracez page).
DroppedAnnotations int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to record these using stats.Record instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added something to specs to this effect: https://github.com/census-instrumentation/opencensus-specs/blob/master/trace/TraceConfig.md

(Totally optional IMO, we can do it in a follow up if you like)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll do this in a follow-up right after this (updated the TODO).

DroppedMessageEvents int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add godoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

DroppedLinks int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add godoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
47 changes: 44 additions & 3 deletions trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Span struct {
endOnce sync.Once

executionTracerTaskEnd func() // ends the execution tracer span
bufferLimit int // limits all variable span data (slices)
}

// IsRecordingEvents returns true if events are being recorded for this span.
Expand Down Expand Up @@ -110,6 +111,9 @@ const (
SpanKindClient
)

// DefaultBufferLimit is the default value for trace.StartOptions.BufferLimit.
const DefaultBufferLimit = 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if limits for annotations, message events and links should have the same upper limit. I expect links to contain much less items then for instance annotations so probably should have a lower one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suppose I was just trying to put a cap on the current long-lived stream memory leak, but we should think it through a bit more.

Your suggestion is pretty easy to implement, but I'll just leave it as is until others chime in (or have a different suggestion altogether).

I think as long as we have sensible defaults for these 3 options (so the client doesn't need to always specify 3 options), then I wouldn't mind having all 3.

I'd also be in favor of whatever the other language implementations are doing (or the spec) to keep the behavior consistent, but I haven't looked that up yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be consistent and allow user to override this default via trace.ApplyConfig. It is not represented in the specs and I filed census-instrumentation/opencensus-specs#147 to discuss.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not sure if user should change the defaults right now. We shouldn't export this constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we've decided to allow trace.ApplyConfig updates and per-span StartOption updates now.

To your other point: I usually export default consts so the client can use them as defaults in their runtime flags if they want (if you feel strongly about this, I could unexport them).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec have different limits for each entities. Attributes: 32, Message events: 128, ...

https://github.com/census-instrumentation/opencensus-specs/blob/master/trace/TraceConfig.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// StartOptions contains options concerning how a span is started.
type StartOptions struct {
// Sampler to consult for this Span. If provided, it is always consulted.
Expand All @@ -125,6 +129,11 @@ type StartOptions struct {
// SpanKind represents the kind of a span. If none is set,
// SpanKindUnspecified is used.
SpanKind int

// BufferLimit makes new spans with a custom buffer limit.
// This limits variable span data: Message Events, Links, and Annotations.
// The default is 1000 (trace.DefaultBufferLimit) and the minimum is 1.
BufferLimit int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the above mentioned upper limits I also think we should be careful with setting the bottom limit. Where it could be sensible for links to have a lower limit of 1 (or even 0) it probably is a bad idea to have one as low as this for annotations or message events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation was: only the most recent message events would be recorded / exported, even if it was just 1. But it sounds like I might be mistaken here (I'm not very familiar with the internals).

If we end up doing different floors, then we may need to validate the options (instead of just silently fixing it within the option). I was debating that already with the shared buffer limit, but it's a breaking change and makes it less convenient to use. Something like this:

ocsh, err := ocgrpc.NewServerHandler(...trace.StartOptions)

(and a drawback: the ServerHandler.IsPublicEndPoint option isn't a trace.StartOption)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the spec we should have a different limit for each: https://github.com/census-instrumentation/opencensus-specs/blob/master/trace/TraceConfig.md

There is an issue to add to StartOptions #670

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we should enforce a lower limit at all. Anything 0 or less should mean "drop immediately".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I have an updated CL ready. I'd like to clarify one thing before sending it out.

The spec refers to a global trace config (allowing global overrides). It looks like this is what Java/C++ does today as well.

Are the per-span overrides in addition to the global default overrides or do they replace the global default overrides altogether?

I assumed it was in addition and ended up adding a GlobalOption variadic option type for trace.ApplyConfig. I don't mind removing this if you don't want global overrides. The signature looks like: type GlobalOption func(*Config).

Here's how it works:

  1. global var config is set with defaults per the spec
  2. trace.ApplyConfig rolls variadic GlobalOptions on top of that (if any)
  3. trace.StartSpan rolls variadic StartOptions on top of that (if any)
  4. unit test added to cover each scenario

This doesn't break trace.ApplyConfig's API, but I couldn't use the required config struct arg (since it's ambiguous if the integers were set or not (due to the zero value for that type). If we move forward with this change, we may want to deprecate the required config struct arg at a later time (I know it's used in a lot of places though).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need global overrides. Having three levels of overrides sounds like going too far. I think its enough to have global defaults that can be changed by trace.ApplyConfig and per-Span values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always add the "panic level" later if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I've been referring to a "global override" as something that overrides/changes global defaults with trace.ApplyConfig. Naming things is hard :)

So, we're in agreement then and I've posted my updated commit.

However, trace.ApplyConfig's existing implementation (coupled with limit values have meaning when they're <= 0) still makes this trickier than it seems due to Go's handling of zero values. I'll start a comment thread over there.

}

// StartOption apply changes to StartOptions.
Expand All @@ -145,13 +154,25 @@ func WithSampler(sampler Sampler) StartOption {
}
}

// WithBufferLimit makes new spans with a custom buffer limit.
// This limits variable trace span data: Message Events, Links, and Annotations.
// The default is 1000 (trace.DefaultBufferLimit) and the minimum is 1.
func WithBufferLimit(limit int) StartOption {
return func(o *StartOptions) {
o.BufferLimit = limit
if o.BufferLimit <= 0 {
o.BufferLimit = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be the default not 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've decided on a different strategy (per @Ramonza). The client can pass anything in. If it's <= 0, then that trace type is disabled. I think it simplifies things.

}
}
}

// StartSpan starts a new child span of the current span in the context. If
// there is no span in the context, creates a new trace and span.
//
// Returned context contains the newly created span. You can use it to
// propagate the returned span in process.
func StartSpan(ctx context.Context, name string, o ...StartOption) (context.Context, *Span) {
var opts StartOptions
opts := StartOptions{BufferLimit: DefaultBufferLimit}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't set the default here.

In startSpanInternal, set the default if opts.BufferLimit is zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was re-worked a bit from the other changes.

Defaults can now be changed via trace.ApplyConfig and can also be <= 0. If it's <= 0, it means disabled, so we can't reset to the default if it's currently 0.

This involves layering / rolling the opts. I decided to to this all in startSpanInternal to simplify it -- the exported functions weren't actually doing anything with the opts other than building them and passing them to startSpanInternal.

var parent SpanContext
if p := FromContext(ctx); p != nil {
parent = p.spanContext
Expand All @@ -174,7 +195,7 @@ func StartSpan(ctx context.Context, name string, o ...StartOption) (context.Cont
// Returned context contains the newly created span. You can use it to
// propagate the returned span in process.
func StartSpanWithRemoteParent(ctx context.Context, name string, parent SpanContext, o ...StartOption) (context.Context, *Span) {
var opts StartOptions
opts := StartOptions{BufferLimit: DefaultBufferLimit}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment on line 175.

for _, op := range o {
op(&opts)
}
Expand All @@ -185,7 +206,7 @@ func StartSpanWithRemoteParent(ctx context.Context, name string, parent SpanCont
}

func startSpanInternal(name string, hasParent bool, parent SpanContext, remoteParent bool, o StartOptions) *Span {
span := &Span{}
span := &Span{bufferLimit: o.BufferLimit}
span.spanContext = parent

cfg := config.Load().(*Config)
Expand Down Expand Up @@ -340,6 +361,10 @@ func (s *Span) lazyPrintfInternal(attributes []Attribute, format string, a ...in
m = make(map[string]interface{})
copyAttributes(m, attributes)
}
if l := len(s.data.Annotations); l > 0 && l >= s.bufferLimit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not all annotations are equal. especially once we start to have common annotations which have global meaning or even have annotations which have consumer platform meaning we need to be able to whitelist these so they don't get lost by tags of lesser importance.

I think we need some sort of "must keep" list that is configurable and probably have a list of system annotations that should never be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I didn't know this was in the works.

Sounds like the "must keep" annotations should be stored in a separate slice, but that would require other changes. SpanData could have an Annotations() method that merges the separate annotation slices together (and maybe a Kind enum field -- system/user/bulk should be in each one). I suppose the individual slices could still be exported alongside the merge helper method, though.

Regardless, maybe we should consider doing two commits for this (global / simple limits first, classified ones later)? The message events leak in particular is fairly serious for anyone that has a long lived stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this to a future PR once we have a concept of annotation priority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Annotation limit removed for now (TODO put in place).

s.data.Annotations = s.data.Annotations[1:]
s.data.DroppedAnnotations++
}
s.data.Annotations = append(s.data.Annotations, Annotation{
Time: now,
Message: msg,
Expand All @@ -356,6 +381,10 @@ func (s *Span) printStringInternal(attributes []Attribute, str string) {
a = make(map[string]interface{})
copyAttributes(a, attributes)
}
if l := len(s.data.Annotations); l > 0 && l >= s.bufferLimit {
s.data.Annotations = s.data.Annotations[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doing it like this will cause extra allocation every time you exceed the limit because you are always adding to the end of the slice but removing from the beginning. While this is an improvement on the current situation (memory leak) and I am happy to have it like this for now, something to consider would be to replace this with a circular buffer of fixed size once you reach the maximum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. The underlying array in the slice is still subject to re-allocation with this. Ring buffer sounds like a good idea. I'm not sure if we should have an interface{} based ring or separate ones for each type. If Go had generics, this would be easier to do.

Another trick we could do: when we hit max capacity, we keep overwriting the last event (we'd also get the first and last events this way). I think this may confuse people though.

I'd say we leave it as is and open up a performance issue to implement a ring buffer.

s.data.DroppedAnnotations++
}
s.data.Annotations = append(s.data.Annotations, Annotation{
Time: now,
Message: str,
Expand Down Expand Up @@ -393,6 +422,10 @@ func (s *Span) AddMessageSendEvent(messageID, uncompressedByteSize, compressedBy
}
now := time.Now()
s.mu.Lock()
if l := len(s.data.MessageEvents); l > 0 && l >= s.bufferLimit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as my comment on annotations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if you mean a separate limit for message events (now implemented) or separate limits within message events like annotation priorities (is that also happening?)

s.data.MessageEvents = s.data.MessageEvents[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally once we hit the max number of attributes, we would not allocate any more when you add attributes. I believe as written, we will allocate even when we have reached the maximum.

How about copying the message events to the beginning of the slice:

copy(s.data.MessageEvents[0:l-1], s.data.MessageEvents[1:])
s.data.MessageEvents = s.data.MessageEvents[0:l-1]

Then the subsequent append should never allocate once we reach the max. Would be really awesome to run some benchmarks before and after this change. Not sure any of the existing ones would work since they don't add lots of attributes/events.

s.data.DroppedMessageEvents++
}
s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{
Time: now,
EventType: MessageEventTypeSent,
Expand All @@ -415,6 +448,10 @@ func (s *Span) AddMessageReceiveEvent(messageID, uncompressedByteSize, compresse
}
now := time.Now()
s.mu.Lock()
if l := len(s.data.MessageEvents); l > 0 && l >= s.bufferLimit {
s.data.MessageEvents = s.data.MessageEvents[1:]
s.data.DroppedMessageEvents++
}
s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{
Time: now,
EventType: MessageEventTypeRecv,
Expand All @@ -431,6 +468,10 @@ func (s *Span) AddLink(l Link) {
return
}
s.mu.Lock()
if l := len(s.data.Links); l > 0 && l >= s.bufferLimit {
s.data.Links = s.data.Links[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

s.data.DroppedLinks++
}
s.data.Links = append(s.data.Links, l)
s.mu.Unlock()
}
Expand Down
61 changes: 50 additions & 11 deletions trace/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ func TestStartSpanWithRemoteParent(t *testing.T) {

// startSpan returns a context with a new Span that is recording events and will be exported.
func startSpan(o StartOptions) *Span {
bufferLimit := DefaultBufferLimit
if o.BufferLimit != 0 {
bufferLimit = o.BufferLimit
}
_, span := StartSpanWithRemoteParent(context.Background(), "span0",
SpanContext{
TraceID: tid,
Expand All @@ -232,6 +236,7 @@ func startSpan(o StartOptions) *Span {
},
WithSampler(o.Sampler),
WithSpanKind(o.SpanKind),
WithBufferLimit(bufferLimit),
)
return span
}
Expand Down Expand Up @@ -379,7 +384,9 @@ func TestSetSpanAttributes(t *testing.T) {
}

func TestAnnotations(t *testing.T) {
span := startSpan(StartOptions{})
span := startSpan(StartOptions{BufferLimit: 2})
span.Annotatef([]Attribute{StringAttribute("dropped1", "dropped1")}, "%f", 123.123)
span.Annotate([]Attribute{StringAttribute("dropped2", "dropped2")}, "dropped2")
span.Annotatef([]Attribute{StringAttribute("key1", "value1")}, "%f", 1.5)
span.Annotate([]Attribute{StringAttribute("key2", "value2")}, "Annotate")
got, err := endSpan(span)
Expand All @@ -405,15 +412,19 @@ func TestAnnotations(t *testing.T) {
{Message: "1.500000", Attributes: map[string]interface{}{"key1": "value1"}},
{Message: "Annotate", Attributes: map[string]interface{}{"key2": "value2"}},
},
HasRemoteParent: true,
HasRemoteParent: true,
DroppedAnnotations: 2,
}
if !reflect.DeepEqual(got, want) {
t.Errorf("exporting span: got %#v want %#v", got, want)
}
}

func TestMessageEvents(t *testing.T) {
span := startSpan(StartOptions{})
span := startSpan(StartOptions{BufferLimit: 2})
const dropped1, dropped2 = 123, 456
span.AddMessageReceiveEvent(dropped1, dropped1, dropped1)
span.AddMessageSendEvent(dropped2, dropped2, dropped2)
span.AddMessageReceiveEvent(3, 400, 300)
span.AddMessageSendEvent(1, 200, 100)
got, err := endSpan(span)
Expand All @@ -439,7 +450,8 @@ func TestMessageEvents(t *testing.T) {
{EventType: 2, MessageID: 0x3, UncompressedByteSize: 0x190, CompressedByteSize: 0x12c},
{EventType: 1, MessageID: 0x1, UncompressedByteSize: 0xc8, CompressedByteSize: 0x64},
},
HasRemoteParent: true,
HasRemoteParent: true,
DroppedMessageEvents: 2,
}
if !reflect.DeepEqual(got, want) {
t.Errorf("exporting span: got %#v want %#v", got, want)
Expand Down Expand Up @@ -522,13 +534,31 @@ func TestSetSpanStatus(t *testing.T) {
}

func TestAddLink(t *testing.T) {
span := startSpan(StartOptions{})
span := startSpan(StartOptions{BufferLimit: 2})
span.AddLink(Link{
TraceID: tid,
SpanID: sid,
Type: LinkTypeParent,
Attributes: map[string]interface{}{"dropped1": "dropped1"},
})
span.AddLink(Link{
TraceID: tid,
SpanID: sid,
Type: LinkTypeParent,
Attributes: map[string]interface{}{"dropped2": "dropped2"},
})
span.AddLink(Link{
TraceID: tid,
SpanID: sid,
Type: LinkTypeParent,
Attributes: map[string]interface{}{"key5": "value5"},
})
span.AddLink(Link{
TraceID: tid,
SpanID: sid,
Type: LinkTypeParent,
Attributes: map[string]interface{}{"key6": "value6"},
})
got, err := endSpan(span)
if err != nil {
t.Fatal(err)
Expand All @@ -542,13 +572,22 @@ func TestAddLink(t *testing.T) {
},
ParentSpanID: sid,
Name: "span0",
Links: []Link{{
TraceID: tid,
SpanID: sid,
Type: 2,
Attributes: map[string]interface{}{"key5": "value5"},
}},
Links: []Link{
{
TraceID: tid,
SpanID: sid,
Type: 2,
Attributes: map[string]interface{}{"key5": "value5"},
},
{
TraceID: tid,
SpanID: sid,
Type: 2,
Attributes: map[string]interface{}{"key6": "value6"},
},
},
HasRemoteParent: true,
DroppedLinks: 2,
}
if !reflect.DeepEqual(got, want) {
t.Errorf("exporting span: got %#v want %#v", got, want)
Expand Down