From c9603965619c3b6bf36ef0fdb95e3e0ddee4cf23 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 1 Oct 2018 21:37:52 -0700 Subject: [PATCH] interceptor/opencensus: support multiplexing in Export Add multiplexing/"pass through" mode where spans from sources other than the initiating node can be sent and they should be properly proxied to their final destination. If the currently received node is nil, use the previously received and non-nil node, and after processing spans, memoize the last received node. Updated the tests to lock-in this behavior. This change also adds in tests to ensure this behavior and to avoid regressions in the future. Fixes #43 --- interceptor/opencensus/opencensus.go | 45 ++++--- interceptor/opencensus/opencensus_test.go | 140 +++++++++++++++++++++- 2 files changed, 170 insertions(+), 15 deletions(-) diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index 14b25e95..3c473dea 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -53,6 +53,11 @@ func (oci *OCInterceptor) Config(tcs agenttracepb.TraceService_ConfigServer) err return errUnimplemented } +type spansAndNode struct { + spans []*tracepb.Span + node *commonpb.Node +} + // Export is the gRPC method that receives streamed traces from // OpenCensus-traceproto compatible libraries/applications. func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) error { @@ -60,7 +65,7 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err // the service and start accepting exported spans. const maxTraceInitRetries = 15 // Arbitrary value - var node *commonpb.Node + var initiatingNode *commonpb.Node for i := 0; i < maxTraceInitRetries; i++ { recv, err := tes.Recv() if err != nil { @@ -68,25 +73,18 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err } if nd := recv.Node; nd != nil { - node = nd + initiatingNode = nd break } } - if node == nil { - return fmt.Errorf("failed to receive a non-nil Node even after %d retries", maxTraceInitRetries) + if initiatingNode == nil { + return fmt.Errorf("failed to receive a non-nil initiating Node even after %d retries", maxTraceInitRetries) } // Now that we've got the node, we can start to receive streamed up spans. // The bundler will receive batches of spans i.e. []*tracepb.Span - traceBundler := bundler.NewBundler(([]*tracepb.Span)(nil), func(payload interface{}) { - listOfSpanLists := payload.([][]*tracepb.Span) - flattenedListOfSpans := make([]*tracepb.Span, 0, len(listOfSpanLists)) // In the best case, each spanList has 1 span - for _, listOfSpans := range listOfSpanLists { - flattenedListOfSpans = append(flattenedListOfSpans, listOfSpans...) - } - oci.spanSink.ReceiveSpans(node, flattenedListOfSpans...) - }) + traceBundler := bundler.NewBundler((*spansAndNode)(nil), oci.batchSpanExporting) spanBufferPeriod := oci.spanBufferPeriod if spanBufferPeriod <= 0 { spanBufferPeriod = 2 * time.Second // Arbitrary value @@ -100,13 +98,32 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err traceBundler.DelayThreshold = spanBufferPeriod traceBundler.BundleCountThreshold = spanBufferCount + var lastNonNilNode *commonpb.Node = initiatingNode + for { recv, err := tes.Recv() if err != nil { return err } - // Otherwise add these spans to the bundler - traceBundler.Add(recv.Spans, len(recv.Spans)) + // If a Node has been sent from downstream, save and use it. + node := recv.Node + if node != nil { + lastNonNilNode = node + } + + // Otherwise add them to the bundler. + bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans} + traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) + } +} + +func (oci *OCInterceptor) batchSpanExporting(payload interface{}) { + spnL := payload.([]*spansAndNode) + // TODO: (@odeke-em) investigate if it is necessary + // to group nodes with their respective spans during + // spansAndNode list unfurling then send spans grouped per node + for _, spn := range spnL { + oci.spanSink.ReceiveSpans(spn.node, spn.spans...) } } diff --git a/interceptor/opencensus/opencensus_test.go b/interceptor/opencensus/opencensus_test.go index ac110866..893cc334 100644 --- a/interceptor/opencensus/opencensus_test.go +++ b/interceptor/opencensus/opencensus_test.go @@ -15,8 +15,11 @@ package ocinterceptor_test import ( + "bytes" + "context" "encoding/json" "errors" + "fmt" "net" "reflect" "strconv" @@ -25,6 +28,7 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/timestamp" "google.golang.org/grpc" @@ -154,7 +158,141 @@ func TestOCInterceptor_endToEnd(t *testing.T) { } } -// Helper functions from here on down. +// Issue #43. Export should support node multiplexing. +// The goal is to ensure that OCInterceptor can always support +// a passthrough mode where it initiates Export normally by firstly +// receiving the initiator node. However ti should still be able to +// accept nodes from downstream sources, but if a node isn't specified in +// an exportTrace request, assume it is from the last received and non-nil node. +func TestExportMultiplexing(t *testing.T) { + spanSink := newSpanAppender() + + _, port, doneFn := ocInterceptorOnGRPCServer(t, spanSink, ocinterceptor.WithSpanBufferPeriod(90*time.Millisecond)) + defer doneFn() + + addr := fmt.Sprintf(":%d", port) + cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + t.Fatalf("Failed to create the gRPC client connection: %v", err) + } + defer cc.Close() + + svc := agenttracepb.NewTraceServiceClient(cc) + traceClient, err := svc.Export(context.Background()) + if err != nil { + t.Fatalf("Failed to create the traceClient: %v", err) + } + + // Step 1) The intiation + initiatingNode := &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{ + Pid: 1, + HostName: "multiplexer", + }, + LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_JAVA}, + } + + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: initiatingNode}); err != nil { + t.Fatalf("Failed to send the initiating message: %v", err) + } + + // Step 1a) Send some spans without a node, they should be registered as coming from the initiating node. + sLi := []*tracepb.Span{{TraceId: []byte("1234567890abcde")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLi}); err != nil { + t.Fatalf("Failed to send the proxied message from app1: %v", err) + } + + // Step 2) Send a "proxied" trace message from app1 with "node1" + node1 := &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{Pid: 9489, HostName: "nodejs-host"}, + LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_NODE_JS}, + } + sL1 := []*tracepb.Span{{TraceId: []byte("abcdefghijklmno")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: node1, Spans: sL1}); err != nil { + t.Fatalf("Failed to send the proxied message from app1: %v", err) + } + + // Step 3) Send a trace message without a node but with spans: this + // should be registered as belonging to the last used node i.e. "node1". + sLn1 := []*tracepb.Span{{TraceId: []byte("ABCDEFGHIJKLMNO")}, {TraceId: []byte("1234567890abcde")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn1}); err != nil { + t.Fatalf("Failed to send the proxied message without a node: %v", err) + } + + // Step 4) Send a trace message from a differently proxied node "node2" from app2 + node2 := &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{Pid: 7752, HostName: "golang-host"}, + LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_GO_LANG}, + } + sL2 := []*tracepb.Span{{TraceId: []byte("_B_D_F_H_J_L_N_")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: node2, Spans: sL2}); err != nil { + t.Fatalf("Failed to send the proxied message from app2: %v", err) + } + + // Step 5a) Send a trace message without a node but with spans: this + // should be registered as belonging to the last used node i.e. "node2". + sLn2a := []*tracepb.Span{{TraceId: []byte("_BCDEFGHIJKLMN_")}, {TraceId: []byte("_234567890abcd_")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn2a}); err != nil { + t.Fatalf("Failed to send the proxied message without a node: %v", err) + } + + // Step 5b) + sLn2b := []*tracepb.Span{{TraceId: []byte("_xxxxxxxxxxxxx_")}, {TraceId: []byte("B234567890abcdA")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn2b}); err != nil { + t.Fatalf("Failed to send the proxied message without a node: %v", err) + } + // Give the process sometime to send data over the wire and perform batching + <-time.After(150 * time.Millisecond) + + // Examination time! + resultsMapping := make(map[string][]*tracepb.Span) + + spanSink.forEachEntry(func(node *commonpb.Node, spans []*tracepb.Span) { + resultsMapping[nodeToKey(node)] = spans + }) + + // First things first, we expect exactly 3 unique keys + // 1. Initiating Node + // 2. Node 1 + // 3. Node 2 + if g, w := len(resultsMapping), 3; g != w { + t.Errorf("Got %d keys in the results map; Wanted exactly %d\n\nResultsMapping: %+v\n", g, w, resultsMapping) + } + + // Want span counts + wantSpanCounts := map[string]int{ + nodeToKey(initiatingNode): 1, + nodeToKey(node1): 3, + nodeToKey(node2): 5, + } + for key, wantSpanCounts := range wantSpanCounts { + gotSpanCounts := len(resultsMapping[key]) + if gotSpanCounts != wantSpanCounts { + t.Errorf("Key=%q gotSpanCounts %d wantSpanCounts %d", key, gotSpanCounts, wantSpanCounts) + } + } + + // Now ensure that the exported spans match up exactly with + // the nodes and the last seen node expectation/behavior. + // (or at least their serialized equivalents match up) + wantContents := map[string][]*tracepb.Span{ + nodeToKey(initiatingNode): sLi, + nodeToKey(node1): append(sL1, sLn1...), + nodeToKey(node2): append(sL2, append(sLn2a, sLn2b...)...), + } + + gotBlob, _ := json.Marshal(resultsMapping) + wantBlob, _ := json.Marshal(wantContents) + if !bytes.Equal(gotBlob, wantBlob) { + t.Errorf("Unequal serialization results\nGot:\n\t%s\nWant:\n\t%s\n", gotBlob, wantBlob) + } +} + +func nodeToKey(n *commonpb.Node) string { + blob, _ := proto.Marshal(n) + return string(blob) +} + type spanAppender struct { sync.RWMutex spansPerNode map[*commonpb.Node][]*tracepb.Span