Skip to content

Commit

Permalink
all: propagate context through Transformables (#3515)
Browse files Browse the repository at this point in the history
* all: propagate context through Transformables

We add a context.Context parameter to the Transformable
interface, and update all implementations and callers.
This enables us to connect the sourcemap calls in error
and span events to be traced.

* Update changelog
  • Loading branch information
axw committed Mar 22, 2020
1 parent 3307560 commit ecab86b
Show file tree
Hide file tree
Showing 26 changed files with 83 additions and 62 deletions.
2 changes: 1 addition & 1 deletion beater/jaeger/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ type testcase struct {
func (tc *testcase) setup(t *testing.T) {
reporter := func(ctx context.Context, req publish.PendingReq) error {
for _, transformable := range req.Transformables {
tc.events = append(tc.events, transformable.Transform(req.Tcontext)...)
tc.events = append(tc.events, transformable.Transform(ctx, req.Tcontext)...)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion beater/onboarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type onboardingDoc struct {
listenAddr string
}

func (o onboardingDoc) Transform(tctx *transform.Context) []beat.Event {
func (o onboardingDoc) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
return []beat.Event{{
Timestamp: tctx.RequestTime,
Fields: common.MapStr{
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ https://github.com/elastic/apm-server/compare/7.6\...master[View commits]
* Add pipeline for removing metadata fields from spans {pull}3408[3408]
* Add gRPC sampling endpoint for Jaeger {pull}3490[3490]
* Change default value for apm-server.ssl.client_authentication from optional to none {pull}3500[3500]
* Enabled instrumentation of sourcemaps {pull}3515[3515]
19 changes: 10 additions & 9 deletions model/error/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package error

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -187,7 +188,7 @@ func DecodeEvent(input interface{}, cfg m.Config, err error) (transform.Transfor
return &e, nil
}

func (e *Event) Transform(tctx *transform.Context) []beat.Event {
func (e *Event) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
transformations.Inc()

if e.Exception != nil {
Expand All @@ -198,7 +199,7 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {
}

fields := common.MapStr{
"error": e.fields(tctx),
"error": e.fields(ctx, tctx),
"processor": processorEntry,
}

Expand Down Expand Up @@ -244,14 +245,14 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {
}
}

func (e *Event) fields(tctx *transform.Context) common.MapStr {
func (e *Event) fields(ctx context.Context, tctx *transform.Context) common.MapStr {
e.data = common.MapStr{}
e.add("id", e.Id)
e.add("page", e.Page.Fields())

exceptionChain := flattenExceptionTree(e.Exception)
e.addException(tctx, exceptionChain)
e.addLog(tctx)
e.addException(ctx, tctx, exceptionChain)
e.addLog(ctx, tctx)

e.updateCulprit(tctx)
e.add("culprit", e.Culprit)
Expand Down Expand Up @@ -297,7 +298,7 @@ func findSmappedNonLibraryFrame(frames []*m.StacktraceFrame) *m.StacktraceFrame
return nil
}

func (e *Event) addException(tctx *transform.Context, chain []Exception) {
func (e *Event) addException(ctx context.Context, tctx *transform.Context, chain []Exception) {
var result []common.MapStr
for _, exception := range chain {
ex := common.MapStr{}
Expand All @@ -319,7 +320,7 @@ func (e *Event) addException(tctx *transform.Context, chain []Exception) {
utility.Set(ex, "code", code.String())
}

st := exception.Stacktrace.Transform(tctx)
st := exception.Stacktrace.Transform(ctx, tctx)
utility.Set(ex, "stacktrace", st)

result = append(result, ex)
Expand All @@ -328,7 +329,7 @@ func (e *Event) addException(tctx *transform.Context, chain []Exception) {
e.add("exception", result)
}

func (e *Event) addLog(tctx *transform.Context) {
func (e *Event) addLog(ctx context.Context, tctx *transform.Context) {
if e.Log == nil {
return
}
Expand All @@ -337,7 +338,7 @@ func (e *Event) addLog(tctx *transform.Context) {
utility.Set(log, "param_message", e.Log.ParamMessage)
utility.Set(log, "logger_name", e.Log.LoggerName)
utility.Set(log, "level", e.Log.Level)
st := e.Log.Stacktrace.Transform(tctx)
st := e.Log.Stacktrace.Transform(ctx, tctx)
utility.Set(log, "stacktrace", st)

e.add("log", log)
Expand Down
9 changes: 5 additions & 4 deletions model/error/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package error

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -542,7 +543,7 @@ func TestEventFields(t *testing.T) {
},
}

output := tc.Event.Transform(tctx)
output := tc.Event.Transform(context.Background(), tctx)
require.Len(t, output, 1)
fields := output[0].Fields["error"]
assert.Equal(t, tc.Output, fields)
Expand Down Expand Up @@ -689,7 +690,7 @@ func TestEvents(t *testing.T) {
RequestTime: timestamp,
}

outputEvents := tc.Transformable.Transform(tctx)
outputEvents := tc.Transformable.Transform(context.Background(), tctx)
require.Len(t, outputEvents, 1)
outputEvent := outputEvents[0]
assert.Equal(t, tc.Output, outputEvent.Fields)
Expand Down Expand Up @@ -1037,13 +1038,13 @@ func TestSourcemapping(t *testing.T) {
Config: transform.Config{SourcemapStore: nil},
Metadata: metadata.Metadata{Service: &metadata.Service{Name: &str, Version: &str}},
}
transformedNoSourcemap := event1.fields(tctx)
transformedNoSourcemap := event1.fields(context.Background(), tctx)

// transform with sourcemap store
store, err := sourcemap.NewStore(test.ESClientWithValidSourcemap(t), "apm-*sourcemap*", time.Minute)
require.NoError(t, err)
tctx.Config = transform.Config{SourcemapStore: store}
transformedWithSourcemap := event2.fields(tctx)
transformedWithSourcemap := event2.fields(context.Background(), tctx)

// ensure events have different line number and grouping keys
assert.Equal(t, 1, *event1.Exception.Stacktrace[0].Lineno)
Expand Down
3 changes: 2 additions & 1 deletion model/metricset/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package metricset

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -202,7 +203,7 @@ func (t *Transaction) fields() common.MapStr {
return fields
}

func (me *Metricset) Transform(tctx *transform.Context) []beat.Event {
func (me *Metricset) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
transformations.Inc()
if me == nil {
return nil
Expand Down
5 changes: 3 additions & 2 deletions model/metricset/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package metricset

import (
"context"
"encoding/json"
"fmt"
"testing"
Expand Down Expand Up @@ -245,7 +246,7 @@ func TestTransform(t *testing.T) {

tctx := &transform.Context{Config: transform.Config{}, Metadata: *md}
for idx, test := range tests {
outputEvents := test.Metricset.Transform(tctx)
outputEvents := test.Metricset.Transform(context.Background(), tctx)

for j, outputEvent := range outputEvents {
assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
Expand All @@ -257,7 +258,7 @@ func TestTransform(t *testing.T) {
func TestEventTransformUseReqTime(t *testing.T) {
reqTimestampParsed := time.Date(2017, 5, 30, 18, 53, 27, 154*1e6, time.UTC)
e := Metricset{}
beatEvent := e.Transform(&transform.Context{RequestTime: reqTimestampParsed})
beatEvent := e.Transform(context.Background(), &transform.Context{RequestTime: reqTimestampParsed})
require.Len(t, beatEvent, 1)
assert.Equal(t, reqTimestampParsed, beatEvent[0].Timestamp)
}
3 changes: 2 additions & 1 deletion model/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package profile

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -46,7 +47,7 @@ type PprofProfile struct {
}

// Transform transforms a Profile into a sequence of beat.Events: one per profile sample.
func (pp PprofProfile) Transform(tctx *transform.Context) []beat.Event {
func (pp PprofProfile) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
// Precompute value field names for use in each event.
// TODO(axw) limit to well-known value names?
profileTimestamp := time.Unix(0, pp.Profile.TimeNanos)
Expand Down
6 changes: 3 additions & 3 deletions model/profile/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package profile_test

import (
"context"
"testing"
"time"

Expand All @@ -27,7 +28,6 @@ import (

"github.com/elastic/apm-server/model/metadata"
"github.com/elastic/apm-server/model/profile"
"github.com/elastic/apm-server/sourcemap"
"github.com/elastic/apm-server/transform"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -87,11 +87,11 @@ func TestPprofProfileTransform(t *testing.T) {
metadata := metadata.Metadata{Service: &service}

tctx := &transform.Context{
Config: transform.Config{SourcemapStore: &sourcemap.Store{}},
Config: transform.Config{}, // not used
Metadata: metadata,
RequestTime: time.Time{}, // not used
}
output := pp.Transform(tctx)
output := pp.Transform(context.Background(), tctx)
require.Len(t, output, 2)
assert.Equal(t, output[0], output[1])

Expand Down
5 changes: 3 additions & 2 deletions model/sourcemap/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package sourcemap

import (
"context"
"time"

"github.com/santhosh-tekuri/jsonschema"
Expand Down Expand Up @@ -59,7 +60,7 @@ type Sourcemap struct {
BundleFilepath string
}

func (pa *Sourcemap) Transform(tctx *transform.Context) []beat.Event {
func (pa *Sourcemap) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
sourcemapCounter.Inc()
if pa == nil {
return nil
Expand All @@ -68,7 +69,7 @@ func (pa *Sourcemap) Transform(tctx *transform.Context) []beat.Event {
if tctx.Config.SourcemapStore == nil {
logp.NewLogger(logs.Sourcemap).Error("Sourcemap Accessor is nil, cache cannot be invalidated.")
} else {
tctx.Config.SourcemapStore.Added(pa.ServiceName, pa.ServiceVersion, pa.BundleFilepath)
tctx.Config.SourcemapStore.Added(ctx, pa.ServiceName, pa.ServiceVersion, pa.BundleFilepath)
}

ev := beat.Event{
Expand Down
9 changes: 5 additions & 4 deletions model/sourcemap/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package sourcemap

import (
"context"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestDecode(t *testing.T) {
sourcemap, err := DecodeSourcemap(data)
assert.NoError(t, err)

rs := sourcemap.Transform(&transform.Context{})
rs := sourcemap.Transform(context.Background(), &transform.Context{})
assert.Len(t, rs, 1)
event := rs[0]
assert.WithinDuration(t, time.Now(), event.Timestamp, time.Second)
Expand All @@ -77,7 +78,7 @@ func TestTransform(t *testing.T) {
}

tctx := &transform.Context{}
events := p.Transform(tctx)
events := p.Transform(context.Background(), tctx)
assert.Len(t, events, 1)
event := events[0]

Expand Down Expand Up @@ -120,7 +121,7 @@ func TestInvalidateCache(t *testing.T) {
require.NoError(t, err)

// transform with sourcemap store
event.Transform(&transform.Context{Config: transform.Config{SourcemapStore: store}})
event.Transform(context.Background(), &transform.Context{Config: transform.Config{SourcemapStore: store}})

logCollection := logp.ObserverLogs().TakeAll()
assert.Equal(t, 2, len(logCollection))
Expand All @@ -143,7 +144,7 @@ func TestInvalidateCache(t *testing.T) {
require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput()))

// transform with sourcemap store
event.Transform(&transform.Context{Config: transform.Config{SourcemapStore: nil}})
event.Transform(context.Background(), &transform.Context{Config: transform.Config{SourcemapStore: nil}})

logCollection := logp.ObserverLogs().TakeAll()
assert.Equal(t, 1, len(logCollection))
Expand Down
9 changes: 5 additions & 4 deletions model/span/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package span

import (
"context"
"net"
"strings"
"time"
Expand Down Expand Up @@ -374,7 +375,7 @@ func DecodeEvent(input interface{}, cfg m.Config, err error) (transform.Transfor
return &event, nil
}

func (e *Event) Transform(tctx *transform.Context) []beat.Event {
func (e *Event) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
transformations.Inc()
if frames := len(e.Stacktrace); frames > 0 {
stacktraceCounter.Inc()
Expand All @@ -383,7 +384,7 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {

fields := common.MapStr{
"processor": processorEntry,
spanDocType: e.fields(tctx),
spanDocType: e.fields(ctx, tctx),
}

// first set the generic metadata
Expand Down Expand Up @@ -420,7 +421,7 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {
}
}

func (e *Event) fields(tctx *transform.Context) common.MapStr {
func (e *Event) fields(ctx context.Context, tctx *transform.Context) common.MapStr {
if e == nil {
return nil
}
Expand Down Expand Up @@ -448,7 +449,7 @@ func (e *Event) fields(tctx *transform.Context) common.MapStr {

utility.Set(fields, "message", e.Message.Fields())

st := e.Stacktrace.Transform(tctx)
st := e.Stacktrace.Transform(ctx, tctx)
utility.Set(fields, "stacktrace", st)
return fields
}
5 changes: 3 additions & 2 deletions model/span/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package span

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestSpanTransform(t *testing.T) {
RequestTime: timestamp,
}
for _, test := range tests {
output := test.Event.Transform(tctx)
output := test.Event.Transform(context.Background(), tctx)
fields := output[0].Fields
assert.Equal(t, test.Output, fields)
}
Expand All @@ -388,7 +389,7 @@ func TestEventTransformUseReqTimePlusStart(t *testing.T) {
reqTimestampParsed := time.Date(2017, 5, 30, 18, 53, 27, 154*1e6, time.UTC)
start := 1234.8
e := Event{Start: &start}
beatEvent := e.Transform(&transform.Context{RequestTime: reqTimestampParsed})
beatEvent := e.Transform(context.Background(), &transform.Context{RequestTime: reqTimestampParsed})
require.Len(t, beatEvent, 1)

adjustedParsed := time.Date(2017, 5, 30, 18, 53, 28, 388.8*1e6, time.UTC)
Expand Down
4 changes: 2 additions & 2 deletions model/stacktrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func DecodeStacktrace(input interface{}, hasShortFieldNames bool, err error) (*S
return &st, err
}

func (st *Stacktrace) Transform(tctx *transform.Context) []common.MapStr {
func (st *Stacktrace) Transform(ctx context.Context, tctx *transform.Context) []common.MapStr {
if st == nil {
return nil
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func (st *Stacktrace) Transform(tctx *transform.Context) []common.MapStr {
logger := logp.NewLogger(logs.Stacktrace)
fct := "<anonymous>"
return st.transform(tctx, func(frame *StacktraceFrame) {
fct, errMsg = frame.applySourcemap(context.TODO(), tctx.Config.SourcemapStore, tctx.Metadata.Service, fct)
fct, errMsg = frame.applySourcemap(ctx, tctx.Config.SourcemapStore, tctx.Metadata.Service, fct)
if errMsg != "" {
if _, ok := sourcemapErrorSet[errMsg]; !ok {
logger.Warn(errMsg)
Expand Down
Loading

0 comments on commit ecab86b

Please sign in to comment.