Skip to content

Commit

Permalink
Introduce "recording" config
Browse files Browse the repository at this point in the history
We introduce the ELASTIC_APM_RECORDING configuration.
This is a boolean configuration that defaults to true,
controlling whether events are recorded and sent. When
recording is true there should be no change; when false:

 - Transactions will always be recorded as "unsampled",
   and will be silently discarded when ended, without
   affecting tracer statistics
 - Spans will all be dropped by virtue of transactions
   all being unsampled
 - Captured errors will not have details filled in, and
   will be silently discarded when "sent" without affecting
   tracer statistics
 - Breakdown metrics will not be updated
 - Metrics gathering will be disabled

Recording can be updated via central config and by using
the new Tracer.SetRecording method.

We also introduce a new Tracer.Recording method which
reports whether events are being recorded. If the tracer
is inactive (Tracer.Active returns false), then
Tracer.Recording will also return false. This new method
can be used by instrumentation to avoid expensive
instrumentation paths when recording is disabled. We have
updated all provided instrumentation modules to use the
new Tracer.Recording method instead of Tracer.Active.
  • Loading branch information
axw committed Apr 1, 2020
1 parent 30724e0 commit 6e8ca3f
Show file tree
Hide file tree
Showing 22 changed files with 333 additions and 79 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ endif::[]
https://github.com/elastic/apm-agent-go/compare/v1.7.2...master[View commits]
- Add "recording" config option, to dynamically disable event recording {pull}737[(#737)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
17 changes: 17 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
envEnvironment = "ELASTIC_APM_ENVIRONMENT"
envSpanFramesMinDuration = "ELASTIC_APM_SPAN_FRAMES_MIN_DURATION"
envActive = "ELASTIC_APM_ACTIVE"
envRecording = "ELASTIC_APM_RECORDING"
envAPIRequestSize = "ELASTIC_APM_API_REQUEST_SIZE"
envAPIRequestTime = "ELASTIC_APM_API_REQUEST_TIME"
envAPIBufferSize = "ELASTIC_APM_API_BUFFER_SIZE"
Expand Down Expand Up @@ -252,6 +253,10 @@ func initialActive() (bool, error) {
return configutil.ParseBoolEnv(envActive, true)
}

func initialRecording() (bool, error) {
return configutil.ParseBoolEnv(envRecording, true)
}

func initialDisabledMetrics() wildcard.Matchers {
return configutil.ParseWildcardPatternsEnv(envDisableMetrics, nil)
}
Expand Down Expand Up @@ -341,6 +346,17 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
cfg.maxSpans = value
})
}
case envRecording:
recording, err := strconv.ParseBool(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.recording = recording
})
}
case envTransactionSampleRate:
sampler, err := parseSampleRate(k, v)
if err != nil {
Expand Down Expand Up @@ -438,6 +454,7 @@ type instrumentationConfig struct {
// set the initial entry in instrumentationConfig.local, in order to properly reset
// to the local value, even if the default is the zero value.
type instrumentationConfigValues struct {
recording bool
captureBody CaptureBodyMode
captureHeaders bool
maxSpans int
Expand Down
54 changes: 54 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,57 @@ func TestTracerConfigWatcherPrecedence(t *testing.T) {
}
}
}

func TestTracerCentralConfigRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()

changes := make(chan apmconfig.Change)
watcherFunc := apmtest.WatchConfigFunc(func(ctx context.Context, params apmconfig.WatchParams) <-chan apmconfig.Change {
return changes
})
tracer.SetLogger(apmtest.NewTestLogger(t))
tracer.SetConfigWatcher(watcherFunc)
tracer.SetMetricsInterval(0) // disable periodic gathering

checkRecording := func(expected bool) {
defer tracer.ResetPayloads()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)
if expected {
require.True(t, tracer.Recording())
tracer.SendMetrics(nil)
payloads := tracer.Payloads()
require.NotEmpty(t, payloads.Metrics)
require.NotEmpty(t, payloads.Transactions)
} else {
require.False(t, tracer.Recording())
// testTracerMetricsNotRecording enables periodic
// gathering, checks that no gathering takes place
// (because we're expected not to be recording),
// and then disable periodic gathering again.
testTracerMetricsNotRecording(t, tracer)
payloads := tracer.Payloads()
require.Empty(t, payloads.Transactions)
}
}
updateRemoteConfig := func(attrs map[string]string) {
// We send twice as a means of waiting for the first change to be applied.
for i := 0; i < 2; i++ {
changes <- apmconfig.Change{Attrs: attrs}
}
}

// Initially local config is in effect.
checkRecording(true)

updateRemoteConfig(map[string]string{"recording": "false"})
checkRecording(false)

updateRemoteConfig(map[string]string{"recording": "true"})
tracer.SetRecording(false) // not effective, remote config in effect
checkRecording(true)

updateRemoteConfig(map[string]string{})
checkRecording(false) // local config in effect now
}
14 changes: 14 additions & 0 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,20 @@ You must use the query bar to filter for a specific environment in versions prio
Enable or disable the agent. If set to false, then the Go agent does not send
any data to the Elastic APM server, and instrumentation overhead is minimized.

[float]
[[config-recording]]
=== `ELASTIC_APM_RECORDING`

[options="header"]
|============
| Environment | Default | Example
| `ELASTIC_APM_RECORDING` | true | `false`
|============

Enable or disable recording of events. If set to false, then the Go agent does
send any events to the Elastic APM server, and instrumentation overhead is
minimized, but the agent will continue to poll the server for configuration changes.

[float]
[[config-global-labels]]
=== `ELASTIC_APM_GLOBAL_LABELS`
Expand Down
1 change: 1 addition & 0 deletions env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func TestTracerActiveEnv(t *testing.T) {
tracer, transport := transporttest.NewRecorderTracer()
defer tracer.Close()
assert.False(t, tracer.Active())
assert.False(t, tracer.Recording()) // inactive => not recording

tx := tracer.StartTransaction("name", "type")
tx.End()
Expand Down
52 changes: 32 additions & 20 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ func (t *Tracer) NewError(err error) *Error {
e := t.newError()
e.cause = err
e.err = err.Error()
rand.Read(e.ID[:]) // ignore error, can't do anything about it
initException(&e.exception, err, e.stackTraceLimit)
if len(e.exception.stacktrace) == 0 {
e.SetStacktrace(2)
if e.recording {
rand.Read(e.ID[:]) // ignore error, can't do anything about it
initException(&e.exception, err, e.stackTraceLimit)
if len(e.exception.stacktrace) == 0 {
e.SetStacktrace(2)
}
}
return e
}
Expand All @@ -108,20 +110,22 @@ func (t *Tracer) NewError(err error) *Error {
// If r.Message is empty, "[EMPTY]" will be used.
func (t *Tracer) NewErrorLog(r ErrorLogRecord) *Error {
e := t.newError()
e.log = ErrorLogRecord{
Message: truncateString(r.Message),
MessageFormat: truncateString(r.MessageFormat),
Level: truncateString(r.Level),
LoggerName: truncateString(r.LoggerName),
}
if e.log.Message == "" {
e.log.Message = "[EMPTY]"
}
e.cause = r.Error
e.err = e.log.Message
rand.Read(e.ID[:]) // ignore error, can't do anything about it
if r.Error != nil {
initException(&e.exception, r.Error, e.stackTraceLimit)
if e.recording {
e.log = ErrorLogRecord{
Message: truncateString(r.Message),
MessageFormat: truncateString(r.MessageFormat),
Level: truncateString(r.Level),
LoggerName: truncateString(r.LoggerName),
}
if e.log.Message == "" {
e.log.Message = "[EMPTY]"
}
rand.Read(e.ID[:]) // ignore error, can't do anything about it
if r.Error != nil {
initException(&e.exception, r.Error, e.stackTraceLimit)
}
}
return e
}
Expand All @@ -137,11 +141,14 @@ func (t *Tracer) newError() *Error {
},
}
}
e.Timestamp = time.Now()

instrumentationConfig := t.instrumentationConfig()
e.Context.captureHeaders = instrumentationConfig.captureHeaders
e.stackTraceLimit = instrumentationConfig.stackTraceLimit
e.recording = instrumentationConfig.recording
if e.recording {
e.Timestamp = time.Now()
e.Context.captureHeaders = instrumentationConfig.captureHeaders
e.stackTraceLimit = instrumentationConfig.stackTraceLimit
}

return &Error{ErrorData: e}
}
Expand All @@ -166,6 +173,7 @@ type Error struct {
// When the error is sent, its ErrorData field will be set to nil.
type ErrorData struct {
tracer *Tracer
recording bool
stackTraceLimit int
exception exceptionData
log ErrorLogRecord
Expand Down Expand Up @@ -306,7 +314,11 @@ func (e *Error) Send() {
if e == nil || e.sent() {
return
}
e.ErrorData.enqueue()
if e.recording {
e.ErrorData.enqueue()
} else {
e.reset()
}
e.ErrorData = nil
}

Expand Down
16 changes: 16 additions & 0 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,22 @@ func TestErrorNilError(t *testing.T) {
assert.EqualError(t, e, "")
}

func TestErrorNotRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetRecording(false)

e := tracer.NewError(errors.New("boom"))
require.NotNil(t, e)
require.NotNil(t, e.ErrorData)
e.Send()
require.Nil(t, e.ErrorData)
tracer.Flush(nil)

payloads := tracer.Payloads()
require.Empty(t, payloads.Errors)
}

func TestErrorTransactionSampled(t *testing.T) {
_, _, errors := apmtest.WithTransaction(func(ctx context.Context) {
apm.TransactionFromContext(ctx).Type = "foo"
Expand Down
51 changes: 50 additions & 1 deletion metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/require"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/model"
"go.elastic.co/apm/transport/transporttest"
)
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestTracerMetricsBuffered(t *testing.T) {
}
}

func TestTracerMetricsDisable(t *testing.T) {
func TestTracerDisableMetrics(t *testing.T) {
os.Setenv("ELASTIC_APM_DISABLE_METRICS", "golang.heap.*, system.memory.*, system.process.*")
defer os.Unsetenv("ELASTIC_APM_DISABLE_METRICS")

Expand All @@ -302,6 +303,54 @@ func TestTracerMetricsDisable(t *testing.T) {
assert.EqualValues(t, expected, actual)
}

func TestTracerMetricsNotRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetRecording(false)
testTracerMetricsNotRecording(t, tracer)
}

func testTracerMetricsNotRecording(t *testing.T, tracer *apmtest.RecordingTracer) {
done := make(chan struct{})
defer close(done)

gathered := make(chan struct{})
tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(
func(ctx context.Context, m *apm.Metrics) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
case gathered <- struct{}{}:
}
return nil
},
))

tracer.SetMetricsInterval(time.Millisecond)
defer tracer.SetMetricsInterval(0) // disable at end

sent := make(chan struct{})
go func() {
defer close(sent)
tracer.SendMetrics(nil) // unblocked by tracer.Close
}()

// Because the tracer is configured to not record,
// the metrics gatherer should never be called.
select {
case <-time.After(100 * time.Millisecond):
case <-sent:
t.Fatal("expected SendMetrics to block")
case <-gathered:
t.Fatal("unexpected metrics gatherer call")
}

tracer.Flush(nil) // empty queue, should not block
payloads := tracer.Payloads()
require.Empty(t, payloads.Metrics)
}

// busyWork does meaningless work for the specified duration,
// so we can observe CPU usage.
func busyWork(d time.Duration) int {
Expand Down
2 changes: 1 addition & 1 deletion module/apmecho/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type middleware struct {

func (m *middleware) handle(c echo.Context) error {
req := c.Request()
if !m.tracer.Active() || m.requestIgnorer(req) {
if !m.tracer.Recording() || m.requestIgnorer(req) {
return m.handler(c)
}
name := req.Method + " " + c.Path()
Expand Down
2 changes: 1 addition & 1 deletion module/apmechov4/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type middleware struct {

func (m *middleware) handle(c echo.Context) error {
req := c.Request()
if !m.tracer.Active() || m.requestIgnorer(req) {
if !m.tracer.Recording() || m.requestIgnorer(req) {
return m.handler(c)
}
name := req.Method + " " + c.Path()
Expand Down
2 changes: 1 addition & 1 deletion module/apmgin/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type routeInfo struct {
}

func (m *middleware) handle(c *gin.Context) {
if !m.tracer.Active() || m.requestIgnorer(c.Request) {
if !m.tracer.Recording() || m.requestIgnorer(c.Request) {
c.Next()
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmgrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewUnaryServerInterceptor(o ...ServerOption) grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
if !opts.tracer.Active() || opts.requestIgnorer(info) {
if !opts.tracer.Recording() || opts.requestIgnorer(info) {
return handler(ctx, req)
}
tx, ctx := startTransaction(ctx, opts.tracer, info.FullMethod)
Expand Down
2 changes: 1 addition & 1 deletion module/apmhttp/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type handler struct {
// ServeHTTP delegates to h.Handler, tracing the transaction with
// h.Tracer, or apm.DefaultTracer if h.Tracer is nil.
func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !h.tracer.Active() || h.requestIgnorer(req) {
if !h.tracer.Recording() || h.requestIgnorer(req) {
h.handler.ServeHTTP(w, req)
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmhttprouter/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
func Wrap(h httprouter.Handle, route string, o ...Option) httprouter.Handle {
opts := gatherOptions(o...)
return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
if !opts.tracer.Active() || opts.requestIgnorer(req) {
if !opts.tracer.Recording() || opts.requestIgnorer(req) {
h(w, req, p)
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmlogrus/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (h *Hook) Levels() []logrus.Level {
// Fire reports the log entry as an error to the APM Server.
func (h *Hook) Fire(entry *logrus.Entry) error {
tracer := h.tracer()
if !tracer.Active() {
if !tracer.Recording() {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion module/apmrestful/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type filter struct {
}

func (f *filter) filter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
if !f.tracer.Active() || f.requestIgnorer(req.Request) {
if !f.tracer.Recording() || f.requestIgnorer(req.Request) {
chain.ProcessFilter(req, resp)
return
}
Expand Down
Loading

0 comments on commit 6e8ca3f

Please sign in to comment.