Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: propagate context through Transformables #3515

Merged
merged 2 commits into from
Mar 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beater/jaeger/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ type testcase struct {
func (tc *testcase) setup(t *testing.T) {
reporter := func(ctx context.Context, req publish.PendingReq) error {
for _, transformable := range req.Transformables {
tc.events = append(tc.events, transformable.Transform(req.Tcontext)...)
tc.events = append(tc.events, transformable.Transform(ctx, req.Tcontext)...)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion beater/onboarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type onboardingDoc struct {
listenAddr string
}

func (o onboardingDoc) Transform(tctx *transform.Context) []beat.Event {
func (o onboardingDoc) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
return []beat.Event{{
Timestamp: tctx.RequestTime,
Fields: common.MapStr{
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ https://github.com/elastic/apm-server/compare/7.6\...master[View commits]
* Add pipeline for removing metadata fields from spans {pull}3408[3408]
* Add gRPC sampling endpoint for Jaeger {pull}3490[3490]
* Change default value for apm-server.ssl.client_authentication from optional to none {pull}3500[3500]
* Enabled instrumentation of sourcemaps {pull}3515[3515]
19 changes: 10 additions & 9 deletions model/error/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package error

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -187,7 +188,7 @@ func DecodeEvent(input interface{}, cfg m.Config, err error) (transform.Transfor
return &e, nil
}

func (e *Event) Transform(tctx *transform.Context) []beat.Event {
func (e *Event) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
transformations.Inc()

if e.Exception != nil {
Expand All @@ -198,7 +199,7 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {
}

fields := common.MapStr{
"error": e.fields(tctx),
"error": e.fields(ctx, tctx),
"processor": processorEntry,
}

Expand Down Expand Up @@ -244,14 +245,14 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {
}
}

func (e *Event) fields(tctx *transform.Context) common.MapStr {
func (e *Event) fields(ctx context.Context, tctx *transform.Context) common.MapStr {
e.data = common.MapStr{}
e.add("id", e.Id)
e.add("page", e.Page.Fields())

exceptionChain := flattenExceptionTree(e.Exception)
e.addException(tctx, exceptionChain)
e.addLog(tctx)
e.addException(ctx, tctx, exceptionChain)
e.addLog(ctx, tctx)

e.updateCulprit(tctx)
e.add("culprit", e.Culprit)
Expand Down Expand Up @@ -297,7 +298,7 @@ func findSmappedNonLibraryFrame(frames []*m.StacktraceFrame) *m.StacktraceFrame
return nil
}

func (e *Event) addException(tctx *transform.Context, chain []Exception) {
func (e *Event) addException(ctx context.Context, tctx *transform.Context, chain []Exception) {
var result []common.MapStr
for _, exception := range chain {
ex := common.MapStr{}
Expand All @@ -319,7 +320,7 @@ func (e *Event) addException(tctx *transform.Context, chain []Exception) {
utility.Set(ex, "code", code.String())
}

st := exception.Stacktrace.Transform(tctx)
st := exception.Stacktrace.Transform(ctx, tctx)
utility.Set(ex, "stacktrace", st)

result = append(result, ex)
Expand All @@ -328,7 +329,7 @@ func (e *Event) addException(tctx *transform.Context, chain []Exception) {
e.add("exception", result)
}

func (e *Event) addLog(tctx *transform.Context) {
func (e *Event) addLog(ctx context.Context, tctx *transform.Context) {
if e.Log == nil {
return
}
Expand All @@ -337,7 +338,7 @@ func (e *Event) addLog(tctx *transform.Context) {
utility.Set(log, "param_message", e.Log.ParamMessage)
utility.Set(log, "logger_name", e.Log.LoggerName)
utility.Set(log, "level", e.Log.Level)
st := e.Log.Stacktrace.Transform(tctx)
st := e.Log.Stacktrace.Transform(ctx, tctx)
utility.Set(log, "stacktrace", st)

e.add("log", log)
Expand Down
9 changes: 5 additions & 4 deletions model/error/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package error

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -542,7 +543,7 @@ func TestEventFields(t *testing.T) {
},
}

output := tc.Event.Transform(tctx)
output := tc.Event.Transform(context.Background(), tctx)
require.Len(t, output, 1)
fields := output[0].Fields["error"]
assert.Equal(t, tc.Output, fields)
Expand Down Expand Up @@ -689,7 +690,7 @@ func TestEvents(t *testing.T) {
RequestTime: timestamp,
}

outputEvents := tc.Transformable.Transform(tctx)
outputEvents := tc.Transformable.Transform(context.Background(), tctx)
require.Len(t, outputEvents, 1)
outputEvent := outputEvents[0]
assert.Equal(t, tc.Output, outputEvent.Fields)
Expand Down Expand Up @@ -1037,13 +1038,13 @@ func TestSourcemapping(t *testing.T) {
Config: transform.Config{SourcemapStore: nil},
Metadata: metadata.Metadata{Service: &metadata.Service{Name: &str, Version: &str}},
}
transformedNoSourcemap := event1.fields(tctx)
transformedNoSourcemap := event1.fields(context.Background(), tctx)

// transform with sourcemap store
store, err := sourcemap.NewStore(test.ESClientWithValidSourcemap(t), "apm-*sourcemap*", time.Minute)
require.NoError(t, err)
tctx.Config = transform.Config{SourcemapStore: store}
transformedWithSourcemap := event2.fields(tctx)
transformedWithSourcemap := event2.fields(context.Background(), tctx)

// ensure events have different line number and grouping keys
assert.Equal(t, 1, *event1.Exception.Stacktrace[0].Lineno)
Expand Down
3 changes: 2 additions & 1 deletion model/metricset/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package metricset

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -202,7 +203,7 @@ func (t *Transaction) fields() common.MapStr {
return fields
}

func (me *Metricset) Transform(tctx *transform.Context) []beat.Event {
func (me *Metricset) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
transformations.Inc()
if me == nil {
return nil
Expand Down
5 changes: 3 additions & 2 deletions model/metricset/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package metricset

import (
"context"
"encoding/json"
"fmt"
"testing"
Expand Down Expand Up @@ -245,7 +246,7 @@ func TestTransform(t *testing.T) {

tctx := &transform.Context{Config: transform.Config{}, Metadata: *md}
for idx, test := range tests {
outputEvents := test.Metricset.Transform(tctx)
outputEvents := test.Metricset.Transform(context.Background(), tctx)

for j, outputEvent := range outputEvents {
assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
Expand All @@ -257,7 +258,7 @@ func TestTransform(t *testing.T) {
func TestEventTransformUseReqTime(t *testing.T) {
reqTimestampParsed := time.Date(2017, 5, 30, 18, 53, 27, 154*1e6, time.UTC)
e := Metricset{}
beatEvent := e.Transform(&transform.Context{RequestTime: reqTimestampParsed})
beatEvent := e.Transform(context.Background(), &transform.Context{RequestTime: reqTimestampParsed})
require.Len(t, beatEvent, 1)
assert.Equal(t, reqTimestampParsed, beatEvent[0].Timestamp)
}
3 changes: 2 additions & 1 deletion model/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package profile

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -46,7 +47,7 @@ type PprofProfile struct {
}

// Transform transforms a Profile into a sequence of beat.Events: one per profile sample.
func (pp PprofProfile) Transform(tctx *transform.Context) []beat.Event {
func (pp PprofProfile) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
// Precompute value field names for use in each event.
// TODO(axw) limit to well-known value names?
profileTimestamp := time.Unix(0, pp.Profile.TimeNanos)
Expand Down
6 changes: 3 additions & 3 deletions model/profile/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package profile_test

import (
"context"
"testing"
"time"

Expand All @@ -27,7 +28,6 @@ import (

"github.com/elastic/apm-server/model/metadata"
"github.com/elastic/apm-server/model/profile"
"github.com/elastic/apm-server/sourcemap"
"github.com/elastic/apm-server/transform"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -87,11 +87,11 @@ func TestPprofProfileTransform(t *testing.T) {
metadata := metadata.Metadata{Service: &service}

tctx := &transform.Context{
Config: transform.Config{SourcemapStore: &sourcemap.Store{}},
Config: transform.Config{}, // not used
Metadata: metadata,
RequestTime: time.Time{}, // not used
}
output := pp.Transform(tctx)
output := pp.Transform(context.Background(), tctx)
require.Len(t, output, 2)
assert.Equal(t, output[0], output[1])

Expand Down
5 changes: 3 additions & 2 deletions model/sourcemap/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package sourcemap

import (
"context"
"time"

"github.com/santhosh-tekuri/jsonschema"
Expand Down Expand Up @@ -59,7 +60,7 @@ type Sourcemap struct {
BundleFilepath string
}

func (pa *Sourcemap) Transform(tctx *transform.Context) []beat.Event {
func (pa *Sourcemap) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
sourcemapCounter.Inc()
if pa == nil {
return nil
Expand All @@ -68,7 +69,7 @@ func (pa *Sourcemap) Transform(tctx *transform.Context) []beat.Event {
if tctx.Config.SourcemapStore == nil {
logp.NewLogger(logs.Sourcemap).Error("Sourcemap Accessor is nil, cache cannot be invalidated.")
} else {
tctx.Config.SourcemapStore.Added(pa.ServiceName, pa.ServiceVersion, pa.BundleFilepath)
tctx.Config.SourcemapStore.Added(ctx, pa.ServiceName, pa.ServiceVersion, pa.BundleFilepath)
}

ev := beat.Event{
Expand Down
9 changes: 5 additions & 4 deletions model/sourcemap/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package sourcemap

import (
"context"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestDecode(t *testing.T) {
sourcemap, err := DecodeSourcemap(data)
assert.NoError(t, err)

rs := sourcemap.Transform(&transform.Context{})
rs := sourcemap.Transform(context.Background(), &transform.Context{})
assert.Len(t, rs, 1)
event := rs[0]
assert.WithinDuration(t, time.Now(), event.Timestamp, time.Second)
Expand All @@ -77,7 +78,7 @@ func TestTransform(t *testing.T) {
}

tctx := &transform.Context{}
events := p.Transform(tctx)
events := p.Transform(context.Background(), tctx)
assert.Len(t, events, 1)
event := events[0]

Expand Down Expand Up @@ -120,7 +121,7 @@ func TestInvalidateCache(t *testing.T) {
require.NoError(t, err)

// transform with sourcemap store
event.Transform(&transform.Context{Config: transform.Config{SourcemapStore: store}})
event.Transform(context.Background(), &transform.Context{Config: transform.Config{SourcemapStore: store}})

logCollection := logp.ObserverLogs().TakeAll()
assert.Equal(t, 2, len(logCollection))
Expand All @@ -143,7 +144,7 @@ func TestInvalidateCache(t *testing.T) {
require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput()))

// transform with sourcemap store
event.Transform(&transform.Context{Config: transform.Config{SourcemapStore: nil}})
event.Transform(context.Background(), &transform.Context{Config: transform.Config{SourcemapStore: nil}})

logCollection := logp.ObserverLogs().TakeAll()
assert.Equal(t, 1, len(logCollection))
Expand Down
9 changes: 5 additions & 4 deletions model/span/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package span

import (
"context"
"net"
"strings"
"time"
Expand Down Expand Up @@ -374,7 +375,7 @@ func DecodeEvent(input interface{}, cfg m.Config, err error) (transform.Transfor
return &event, nil
}

func (e *Event) Transform(tctx *transform.Context) []beat.Event {
func (e *Event) Transform(ctx context.Context, tctx *transform.Context) []beat.Event {
transformations.Inc()
if frames := len(e.Stacktrace); frames > 0 {
stacktraceCounter.Inc()
Expand All @@ -383,7 +384,7 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {

fields := common.MapStr{
"processor": processorEntry,
spanDocType: e.fields(tctx),
spanDocType: e.fields(ctx, tctx),
}

// first set the generic metadata
Expand Down Expand Up @@ -420,7 +421,7 @@ func (e *Event) Transform(tctx *transform.Context) []beat.Event {
}
}

func (e *Event) fields(tctx *transform.Context) common.MapStr {
func (e *Event) fields(ctx context.Context, tctx *transform.Context) common.MapStr {
if e == nil {
return nil
}
Expand Down Expand Up @@ -448,7 +449,7 @@ func (e *Event) fields(tctx *transform.Context) common.MapStr {

utility.Set(fields, "message", e.Message.Fields())

st := e.Stacktrace.Transform(tctx)
st := e.Stacktrace.Transform(ctx, tctx)
utility.Set(fields, "stacktrace", st)
return fields
}
5 changes: 3 additions & 2 deletions model/span/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package span

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestSpanTransform(t *testing.T) {
RequestTime: timestamp,
}
for _, test := range tests {
output := test.Event.Transform(tctx)
output := test.Event.Transform(context.Background(), tctx)
fields := output[0].Fields
assert.Equal(t, test.Output, fields)
}
Expand All @@ -388,7 +389,7 @@ func TestEventTransformUseReqTimePlusStart(t *testing.T) {
reqTimestampParsed := time.Date(2017, 5, 30, 18, 53, 27, 154*1e6, time.UTC)
start := 1234.8
e := Event{Start: &start}
beatEvent := e.Transform(&transform.Context{RequestTime: reqTimestampParsed})
beatEvent := e.Transform(context.Background(), &transform.Context{RequestTime: reqTimestampParsed})
require.Len(t, beatEvent, 1)

adjustedParsed := time.Date(2017, 5, 30, 18, 53, 28, 388.8*1e6, time.UTC)
Expand Down
4 changes: 2 additions & 2 deletions model/stacktrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func DecodeStacktrace(input interface{}, hasShortFieldNames bool, err error) (*S
return &st, err
}

func (st *Stacktrace) Transform(tctx *transform.Context) []common.MapStr {
func (st *Stacktrace) Transform(ctx context.Context, tctx *transform.Context) []common.MapStr {
if st == nil {
return nil
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func (st *Stacktrace) Transform(tctx *transform.Context) []common.MapStr {
logger := logp.NewLogger(logs.Stacktrace)
fct := "<anonymous>"
return st.transform(tctx, func(frame *StacktraceFrame) {
fct, errMsg = frame.applySourcemap(context.TODO(), tctx.Config.SourcemapStore, tctx.Metadata.Service, fct)
fct, errMsg = frame.applySourcemap(ctx, tctx.Config.SourcemapStore, tctx.Metadata.Service, fct)
if errMsg != "" {
if _, ok := sourcemapErrorSet[errMsg]; !ok {
logger.Warn(errMsg)
Expand Down
Loading