Skip to content

Commit

Permalink
module/apmgrpc: fix trace propagation vs. sampling
Browse files Browse the repository at this point in the history
Fix a bug in the client interceptor which would break
trace propagation when facing dropped spans or non-sampled
transactions. In these cases we should propagate the
transaction's trace context, rather than sending nothing
in the outgoing metadata which would restart tracing.
  • Loading branch information
axw committed Jul 30, 2019
1 parent e6a0136 commit faf2be7
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Optimised HTTP request body capture (#592)
- Fixed transaction encoding to drop tags (and other context) for non-sampled transactions (#593)
- Introduce central config polling (#591)
- Fixed apmgrpc client interceptor, propagating trace context for non-sampled transactions (#602)

## [v1.4.0](https://github.com/elastic/apm-agent-go/releases/tag/v1.4.0)

Expand Down
27 changes: 21 additions & 6 deletions module/apmgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,40 @@ func NewUnaryClientInterceptor(o ...ClientOption) grpc.UnaryClientInterceptor {
opts ...grpc.CallOption,
) error {
span, ctx := startSpan(ctx, method)
defer span.End()
if span != nil {
defer span.End()
}
return invoker(ctx, method, req, resp, cc, opts...)
}
}

func startSpan(ctx context.Context, name string) (*apm.Span, context.Context) {
span, ctx := apm.StartSpan(ctx, name, "external.grpc")
if span.Dropped() {
return span, ctx
tx := apm.TransactionFromContext(ctx)
if tx == nil {
return nil, ctx
}
traceparentValue := apmhttp.FormatTraceparentHeader(span.TraceContext())
traceContext := tx.TraceContext()
if !traceContext.Options.Recorded() {
return nil, outgoingContextWithTraceContext(ctx, traceContext)
}
span := tx.StartSpan(name, "external.grpc", apm.SpanFromContext(ctx))
if !span.Dropped() {
traceContext = span.TraceContext()
ctx = apm.ContextWithSpan(ctx, span)
}
return span, outgoingContextWithTraceContext(ctx, traceContext)
}

func outgoingContextWithTraceContext(ctx context.Context, traceContext apm.TraceContext) context.Context {
traceparentValue := apmhttp.FormatTraceparentHeader(traceContext)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs(traceparentHeader, traceparentValue)
} else {
md = md.Copy()
md.Set(traceparentHeader, traceparentValue)
}
return span, metadata.NewOutgoingContext(ctx, md)
return metadata.NewOutgoingContext(ctx, md)
}

type clientOptions struct {
Expand Down
58 changes: 58 additions & 0 deletions module/apmgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
pb "google.golang.org/grpc/examples/helloworld/helloworld"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/transport/transporttest"
)
Expand Down Expand Up @@ -70,3 +71,60 @@ func TestClientSpan(t *testing.T) {
assert.Equal(t, clientSpans[0].TraceID, serverTransactions[1].TraceID)
assert.Equal(t, clientSpans[0].ID, serverTransactions[1].ParentID)
}

func TestClientSpanDropped(t *testing.T) {
serverTracer := apmtest.NewRecordingTracer()
defer serverTracer.Close()
s, _, addr := newServer(t, serverTracer.Tracer)
defer s.GracefulStop()

conn, client := newClient(t, addr)
defer conn.Close()

clientTracer := apmtest.NewRecordingTracer()
defer clientTracer.Close()
clientTracer.SetMaxSpans(1)

clientTransaction, clientSpans, _ := clientTracer.WithTransaction(func(ctx context.Context) {
for i := 0; i < 2; i++ {
_, err := client.SayHello(ctx, &pb.HelloRequest{Name: "birita"})
require.NoError(t, err)
}
})
require.Len(t, clientSpans, 1)

serverTracer.Flush(nil)
serverTransactions := serverTracer.Payloads().Transactions
require.Len(t, serverTransactions, 2)
for _, serverTransaction := range serverTransactions {
assert.Equal(t, clientTransaction.TraceID, serverTransaction.TraceID)
}
assert.Equal(t, clientSpans[0].ID, serverTransactions[0].ParentID)
assert.Equal(t, clientTransaction.ID, serverTransactions[1].ParentID)
}

func TestClientTransactionUnsampled(t *testing.T) {
serverTracer := apmtest.NewRecordingTracer()
defer serverTracer.Close()
s, _, addr := newServer(t, serverTracer.Tracer)
defer s.GracefulStop()

conn, client := newClient(t, addr)
defer conn.Close()

clientTracer := apmtest.NewRecordingTracer()
defer clientTracer.Close()
clientTracer.SetSampler(apm.NewRatioSampler(0)) // sample nothing

clientTransaction, clientSpans, _ := clientTracer.WithTransaction(func(ctx context.Context) {
_, err := client.SayHello(ctx, &pb.HelloRequest{Name: "birita"})
require.NoError(t, err)
})
require.Len(t, clientSpans, 0)

serverTracer.Flush(nil)
serverTransactions := serverTracer.Payloads().Transactions
require.Len(t, serverTransactions, 1)
assert.Equal(t, clientTransaction.TraceID, serverTransactions[0].TraceID)
assert.Equal(t, clientTransaction.ID, serverTransactions[0].ParentID)
}

0 comments on commit faf2be7

Please sign in to comment.