From f25d46b47317292cdbacd3ade0da1ece4e3ceeb2 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 1 Apr 2020 17:23:34 +0800 Subject: [PATCH] More central configuration enablement Enable central configuration for: - stack_frames_min_duration - stack_trace_limit --- CHANGELOG.asciidoc | 1 + config.go | 22 +++++++ config_test.go | 159 +++++++++++++++++---------------------------- 3 files changed, 83 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ddad2652f..6b8b14664 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -24,6 +24,7 @@ endif::[] https://github.com/elastic/apm-agent-go/compare/v1.7.2...master[View commits] - Add "recording" config option, to dynamically disable event recording {pull}737[(#737)] +- Enable central configuration of "stack_frames_min_duration" and "stack_trace_limit" {pull}742[(#742)] [[release-notes-1.x]] === Go Agent version 1.x diff --git a/config.go b/config.go index 9168312b1..5467b53e6 100644 --- a/config.go +++ b/config.go @@ -357,6 +357,28 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string] cfg.recording = recording }) } + case envSpanFramesMinDuration: + duration, err := configutil.ParseDuration(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } else { + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.spanFramesMinDuration = duration + }) + } + case envStackTraceLimit: + limit, err := strconv.Atoi(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } else { + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.stackTraceLimit = limit + }) + } case envTransactionSampleRate: sampler, err := parseSampleRate(k, v) if err != nil { diff --git a/config_test.go b/config_test.go index 621fac09c..910a75671 100644 --- a/config_test.go +++ b/config_test.go @@ -24,10 +24,12 @@ import ( "net/http/httptest" "net/url" "os" + "strconv" "strings" "testing" "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -35,48 +37,67 @@ import ( "go.elastic.co/apm/apmconfig" "go.elastic.co/apm/apmtest" "go.elastic.co/apm/transport" + "go.elastic.co/apm/transport/transporttest" ) func TestTracerCentralConfigUpdate(t *testing.T) { - run := func(configKey, configValue string, isRemote func(*apm.Tracer) bool) { + run := func(configKey, configValue string, isRemote func(*apmtest.RecordingTracer) bool) { t.Run(configKey, func(t *testing.T) { response, _ := json.Marshal(map[string]string{configKey: configValue}) testTracerCentralConfigUpdate(t, string(response), isRemote) }) } - run("transaction_sample_rate", "0", func(tracer *apm.Tracer) bool { + run("transaction_sample_rate", "0", func(tracer *apmtest.RecordingTracer) bool { return !tracer.StartTransaction("name", "type").Sampled() }) - run("transaction_max_spans", "0", func(tracer *apm.Tracer) bool { + run("transaction_max_spans", "0", func(tracer *apmtest.RecordingTracer) bool { return tracer.StartTransaction("name", "type").StartSpan("name", "type", nil).Dropped() }) - run("capture_body", "all", func(tracer *apm.Tracer) bool { + run("capture_body", "all", func(tracer *apmtest.RecordingTracer) bool { req, _ := http.NewRequest("POST", "/", strings.NewReader("...")) capturer := tracer.CaptureHTTPRequestBody(req) return capturer != nil }) + run("recording", "false", func(tracer *apmtest.RecordingTracer) bool { + return !tracer.Recording() + }) + run("span_frames_min_duration", "1ms", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + + tx := tracer.StartTransaction("name", "type") + span := tx.StartSpan("name", "type", nil) + span.Duration = 1 * time.Millisecond + span.End() + tx.End() + + tracer.Flush(nil) + payloads := tracer.Payloads() + assert.Len(t, payloads.Spans, 1) + return len(payloads.Spans[0].Stacktrace) > 0 + }) + run("stack_trace_limit", "1", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + tracer.NewError(errors.New("boom")).Send() + tracer.Flush(nil) + payloads := tracer.Payloads() + assert.Len(t, payloads.Errors, 1) + return len(payloads.Errors[0].Exception.Stacktrace) == 1 + }) } -func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote func(*apm.Tracer) bool) { - // This test server will respond initially with config that - // disables sampling, and subsequently responses will indicate - // lack of agent config, causing the agent to revert to local - // config. - responded := make(chan struct{}) - var responses int +func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote func(*apmtest.RecordingTracer) bool) { + type response struct { + etag string + body string + } + responses := make(chan response) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { assert.Equal(t, "/config/v1/agents", req.URL.Path) w.Header().Set("Cache-Control", "max-age=1") - if responses == 0 { - w.Header().Set("Etag", `"foo"`) - w.Write([]byte(serverResponse)) - } else { - w.Header().Set("Etag", `"bar"`) - w.Write([]byte(`{}`)) - } - responses++ select { - case responded <- struct{}{}: + case response := <-responses: + w.Header().Set("Etag", strconv.Quote(response.etag)) // `"foo"`) + w.Write([]byte(response.body)) //[]byte(serverResponse)) case <-req.Context().Done(): } })) @@ -87,9 +108,17 @@ func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote require.NoError(t, err) httpTransport.SetServerURL(serverURL) - tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: httpTransport}) + tracer := &apmtest.RecordingTracer{} + var testTransport struct { + apmconfig.Watcher + *transporttest.RecorderTransport + } + testTransport.Watcher = httpTransport + testTransport.RecorderTransport = &tracer.RecorderTransport + + tracer.Tracer, err = apm.NewTracerOptions(apm.TracerOptions{Transport: &testTransport}) require.NoError(t, err) - defer tracer.Close() + defer tracer.Tracer.Close() // This test can be run in parallel with others after creating the tracer, // but not before, because we depend on NewTracerOptions picking up default @@ -100,40 +129,26 @@ func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote assert.False(t, isRemote(tracer)) timeout := time.After(10 * time.Second) - for { - // There's a time window between the server responding - // and the agent updating the config, so we spin until - // it's updated. - remote := isRemote(tracer) - if !remote { - break - } + + // We each response payload twice, which causes us to block until + // the first one is fully consumed. + for i := 0; i < 2; i++ { select { - case <-time.After(10 * time.Millisecond): + case responses <- response{etag: "foo", body: serverResponse}: case <-timeout: t.Fatal("timed out waiting for config update") } } - // We wait for 2 responses so that we know we've unblocked the - // 2nd response, and that the 2nd response has been fully consumed. + assert.True(t, isRemote(tracer)) + for i := 0; i < 2; i++ { select { - case <-responded: + case responses <- response{etag: "bar", body: "{}"}: case <-timeout: t.Fatal("timed out waiting for config update") } } - for { - remote := isRemote(tracer) - if !remote { - break - } - select { - case <-time.After(10 * time.Millisecond): - case <-timeout: - t.Fatal("timed out waiting for config to revert") - } - } + assert.False(t, isRemote(tracer)) } func TestTracerCentralConfigUpdateDisabled(t *testing.T) { @@ -245,57 +260,3 @@ func TestTracerConfigWatcherPrecedence(t *testing.T) { } } } - -func TestTracerCentralConfigRecording(t *testing.T) { - tracer := apmtest.NewRecordingTracer() - defer tracer.Close() - - changes := make(chan apmconfig.Change) - watcherFunc := apmtest.WatchConfigFunc(func(ctx context.Context, params apmconfig.WatchParams) <-chan apmconfig.Change { - return changes - }) - tracer.SetLogger(apmtest.NewTestLogger(t)) - tracer.SetConfigWatcher(watcherFunc) - tracer.SetMetricsInterval(0) // disable periodic gathering - - checkRecording := func(expected bool) { - defer tracer.ResetPayloads() - tracer.StartTransaction("name", "type").End() - tracer.Flush(nil) - if expected { - require.True(t, tracer.Recording()) - tracer.SendMetrics(nil) - payloads := tracer.Payloads() - require.NotEmpty(t, payloads.Metrics) - require.NotEmpty(t, payloads.Transactions) - } else { - require.False(t, tracer.Recording()) - // testTracerMetricsNotRecording enables periodic - // gathering, checks that no gathering takes place - // (because we're expected not to be recording), - // and then disable periodic gathering again. - testTracerMetricsNotRecording(t, tracer) - payloads := tracer.Payloads() - require.Empty(t, payloads.Transactions) - } - } - updateRemoteConfig := func(attrs map[string]string) { - // We send twice as a means of waiting for the first change to be applied. - for i := 0; i < 2; i++ { - changes <- apmconfig.Change{Attrs: attrs} - } - } - - // Initially local config is in effect. - checkRecording(true) - - updateRemoteConfig(map[string]string{"recording": "false"}) - checkRecording(false) - - updateRemoteConfig(map[string]string{"recording": "true"}) - tracer.SetRecording(false) // not effective, remote config in effect - checkRecording(true) - - updateRemoteConfig(map[string]string{}) - checkRecording(false) // local config in effect now -}