Skip to content

Commit

Permalink
Merge pull request #602 from axw/apmgrpc-dropped-span-tracecontext
Browse files Browse the repository at this point in the history
 module/apmgrpc: fix trace propagation vs. sampling
  • Loading branch information
axw committed Jul 31, 2019
2 parents f5dca77 + faf2be7 commit 7796d5a
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Optimised HTTP request body capture (#592)
- Fixed transaction encoding to drop tags (and other context) for non-sampled transactions (#593)
- Introduce central config polling (#591)
- Fixed apmgrpc client interceptor, propagating trace context for non-sampled transactions (#602)

## [v1.4.0](https://github.com/elastic/apm-agent-go/releases/tag/v1.4.0)

Expand Down
26 changes: 26 additions & 0 deletions apmtest/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package apmtest

import (
"context"
"fmt"

"go.elastic.co/apm"
"go.elastic.co/apm/model"
"go.elastic.co/apm/transport/transporttest"
)

Expand All @@ -41,3 +45,25 @@ type RecordingTracer struct {
*apm.Tracer
transporttest.RecorderTransport
}

// WithTransaction calls rt.WithTransactionOptions with a zero apm.TransactionOptions.
func (rt *RecordingTracer) WithTransaction(f func(ctx context.Context)) (model.Transaction, []model.Span, []model.Error) {
return rt.WithTransactionOptions(apm.TransactionOptions{}, f)
}

// WithTransactionOptions starts a transaction with the given options,
// calls f with the transaction in the provided context, ends the transaction
// and flushes the tracer, and then returns the resulting events.
func (rt *RecordingTracer) WithTransactionOptions(opts apm.TransactionOptions, f func(ctx context.Context)) (model.Transaction, []model.Span, []model.Error) {
tx := rt.StartTransactionOptions("name", "type", opts)
ctx := apm.ContextWithTransaction(context.Background(), tx)
f(ctx)

tx.End()
rt.Flush(nil)
payloads := rt.Payloads()
if n := len(payloads.Transactions); n != 1 {
panic(fmt.Errorf("expected 1 transaction, got %d", n))
}
return payloads.Transactions[0], payloads.Spans, payloads.Errors
}
17 changes: 2 additions & 15 deletions apmtest/withtransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package apmtest

import (
"context"
"fmt"

"go.elastic.co/apm"
"go.elastic.co/apm/model"
"go.elastic.co/apm/transport/transporttest"
)

// WithTransaction is equivalent to calling WithTransactionOptions with a zero TransactionOptions.
Expand All @@ -35,18 +33,7 @@ func WithTransaction(f func(ctx context.Context)) (model.Transaction, []model.Sp
// and transaction options, flushes the transaction to a test server, and returns
// the decoded transaction and any associated spans and errors.
func WithTransactionOptions(opts apm.TransactionOptions, f func(ctx context.Context)) (model.Transaction, []model.Span, []model.Error) {
tracer, transport := transporttest.NewRecorderTracer()
tracer := NewRecordingTracer()
defer tracer.Close()

tx := tracer.StartTransactionOptions("name", "type", opts)
ctx := apm.ContextWithTransaction(context.Background(), tx)
f(ctx)

tx.End()
tracer.Flush(nil)
payloads := transport.Payloads()
if n := len(payloads.Transactions); n != 1 {
panic(fmt.Errorf("expected 1 transaction, got %d", n))
}
return payloads.Transactions[0], payloads.Spans, payloads.Errors
return tracer.WithTransactionOptions(opts, f)
}
27 changes: 21 additions & 6 deletions module/apmgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,40 @@ func NewUnaryClientInterceptor(o ...ClientOption) grpc.UnaryClientInterceptor {
opts ...grpc.CallOption,
) error {
span, ctx := startSpan(ctx, method)
defer span.End()
if span != nil {
defer span.End()
}
return invoker(ctx, method, req, resp, cc, opts...)
}
}

func startSpan(ctx context.Context, name string) (*apm.Span, context.Context) {
span, ctx := apm.StartSpan(ctx, name, "external.grpc")
if span.Dropped() {
return span, ctx
tx := apm.TransactionFromContext(ctx)
if tx == nil {
return nil, ctx
}
traceparentValue := apmhttp.FormatTraceparentHeader(span.TraceContext())
traceContext := tx.TraceContext()
if !traceContext.Options.Recorded() {
return nil, outgoingContextWithTraceContext(ctx, traceContext)
}
span := tx.StartSpan(name, "external.grpc", apm.SpanFromContext(ctx))
if !span.Dropped() {
traceContext = span.TraceContext()
ctx = apm.ContextWithSpan(ctx, span)
}
return span, outgoingContextWithTraceContext(ctx, traceContext)
}

func outgoingContextWithTraceContext(ctx context.Context, traceContext apm.TraceContext) context.Context {
traceparentValue := apmhttp.FormatTraceparentHeader(traceContext)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs(traceparentHeader, traceparentValue)
} else {
md = md.Copy()
md.Set(traceparentHeader, traceparentValue)
}
return span, metadata.NewOutgoingContext(ctx, md)
return metadata.NewOutgoingContext(ctx, md)
}

type clientOptions struct {
Expand Down
58 changes: 58 additions & 0 deletions module/apmgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
pb "google.golang.org/grpc/examples/helloworld/helloworld"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/transport/transporttest"
)
Expand Down Expand Up @@ -70,3 +71,60 @@ func TestClientSpan(t *testing.T) {
assert.Equal(t, clientSpans[0].TraceID, serverTransactions[1].TraceID)
assert.Equal(t, clientSpans[0].ID, serverTransactions[1].ParentID)
}

func TestClientSpanDropped(t *testing.T) {
serverTracer := apmtest.NewRecordingTracer()
defer serverTracer.Close()
s, _, addr := newServer(t, serverTracer.Tracer)
defer s.GracefulStop()

conn, client := newClient(t, addr)
defer conn.Close()

clientTracer := apmtest.NewRecordingTracer()
defer clientTracer.Close()
clientTracer.SetMaxSpans(1)

clientTransaction, clientSpans, _ := clientTracer.WithTransaction(func(ctx context.Context) {
for i := 0; i < 2; i++ {
_, err := client.SayHello(ctx, &pb.HelloRequest{Name: "birita"})
require.NoError(t, err)
}
})
require.Len(t, clientSpans, 1)

serverTracer.Flush(nil)
serverTransactions := serverTracer.Payloads().Transactions
require.Len(t, serverTransactions, 2)
for _, serverTransaction := range serverTransactions {
assert.Equal(t, clientTransaction.TraceID, serverTransaction.TraceID)
}
assert.Equal(t, clientSpans[0].ID, serverTransactions[0].ParentID)
assert.Equal(t, clientTransaction.ID, serverTransactions[1].ParentID)
}

func TestClientTransactionUnsampled(t *testing.T) {
serverTracer := apmtest.NewRecordingTracer()
defer serverTracer.Close()
s, _, addr := newServer(t, serverTracer.Tracer)
defer s.GracefulStop()

conn, client := newClient(t, addr)
defer conn.Close()

clientTracer := apmtest.NewRecordingTracer()
defer clientTracer.Close()
clientTracer.SetSampler(apm.NewRatioSampler(0)) // sample nothing

clientTransaction, clientSpans, _ := clientTracer.WithTransaction(func(ctx context.Context) {
_, err := client.SayHello(ctx, &pb.HelloRequest{Name: "birita"})
require.NoError(t, err)
})
require.Len(t, clientSpans, 0)

serverTracer.Flush(nil)
serverTransactions := serverTracer.Payloads().Transactions
require.Len(t, serverTransactions, 1)
assert.Equal(t, clientTransaction.TraceID, serverTransactions[0].TraceID)
assert.Equal(t, clientTransaction.ID, serverTransactions[0].ParentID)
}

0 comments on commit 7796d5a

Please sign in to comment.