diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index 14b25e95..3c473dea 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -53,6 +53,11 @@ func (oci *OCInterceptor) Config(tcs agenttracepb.TraceService_ConfigServer) err return errUnimplemented } +type spansAndNode struct { + spans []*tracepb.Span + node *commonpb.Node +} + // Export is the gRPC method that receives streamed traces from // OpenCensus-traceproto compatible libraries/applications. func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) error { @@ -60,7 +65,7 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err // the service and start accepting exported spans. const maxTraceInitRetries = 15 // Arbitrary value - var node *commonpb.Node + var initiatingNode *commonpb.Node for i := 0; i < maxTraceInitRetries; i++ { recv, err := tes.Recv() if err != nil { @@ -68,25 +73,18 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err } if nd := recv.Node; nd != nil { - node = nd + initiatingNode = nd break } } - if node == nil { - return fmt.Errorf("failed to receive a non-nil Node even after %d retries", maxTraceInitRetries) + if initiatingNode == nil { + return fmt.Errorf("failed to receive a non-nil initiating Node even after %d retries", maxTraceInitRetries) } // Now that we've got the node, we can start to receive streamed up spans. // The bundler will receive batches of spans i.e. []*tracepb.Span - traceBundler := bundler.NewBundler(([]*tracepb.Span)(nil), func(payload interface{}) { - listOfSpanLists := payload.([][]*tracepb.Span) - flattenedListOfSpans := make([]*tracepb.Span, 0, len(listOfSpanLists)) // In the best case, each spanList has 1 span - for _, listOfSpans := range listOfSpanLists { - flattenedListOfSpans = append(flattenedListOfSpans, listOfSpans...) - } - oci.spanSink.ReceiveSpans(node, flattenedListOfSpans...) - }) + traceBundler := bundler.NewBundler((*spansAndNode)(nil), oci.batchSpanExporting) spanBufferPeriod := oci.spanBufferPeriod if spanBufferPeriod <= 0 { spanBufferPeriod = 2 * time.Second // Arbitrary value @@ -100,13 +98,32 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err traceBundler.DelayThreshold = spanBufferPeriod traceBundler.BundleCountThreshold = spanBufferCount + var lastNonNilNode *commonpb.Node = initiatingNode + for { recv, err := tes.Recv() if err != nil { return err } - // Otherwise add these spans to the bundler - traceBundler.Add(recv.Spans, len(recv.Spans)) + // If a Node has been sent from downstream, save and use it. + node := recv.Node + if node != nil { + lastNonNilNode = node + } + + // Otherwise add them to the bundler. + bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans} + traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) + } +} + +func (oci *OCInterceptor) batchSpanExporting(payload interface{}) { + spnL := payload.([]*spansAndNode) + // TODO: (@odeke-em) investigate if it is necessary + // to group nodes with their respective spans during + // spansAndNode list unfurling then send spans grouped per node + for _, spn := range spnL { + oci.spanSink.ReceiveSpans(spn.node, spn.spans...) } } diff --git a/interceptor/opencensus/opencensus_test.go b/interceptor/opencensus/opencensus_test.go index ac110866..893cc334 100644 --- a/interceptor/opencensus/opencensus_test.go +++ b/interceptor/opencensus/opencensus_test.go @@ -15,8 +15,11 @@ package ocinterceptor_test import ( + "bytes" + "context" "encoding/json" "errors" + "fmt" "net" "reflect" "strconv" @@ -25,6 +28,7 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/timestamp" "google.golang.org/grpc" @@ -154,7 +158,141 @@ func TestOCInterceptor_endToEnd(t *testing.T) { } } -// Helper functions from here on down. +// Issue #43. Export should support node multiplexing. +// The goal is to ensure that OCInterceptor can always support +// a passthrough mode where it initiates Export normally by firstly +// receiving the initiator node. However ti should still be able to +// accept nodes from downstream sources, but if a node isn't specified in +// an exportTrace request, assume it is from the last received and non-nil node. +func TestExportMultiplexing(t *testing.T) { + spanSink := newSpanAppender() + + _, port, doneFn := ocInterceptorOnGRPCServer(t, spanSink, ocinterceptor.WithSpanBufferPeriod(90*time.Millisecond)) + defer doneFn() + + addr := fmt.Sprintf(":%d", port) + cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + t.Fatalf("Failed to create the gRPC client connection: %v", err) + } + defer cc.Close() + + svc := agenttracepb.NewTraceServiceClient(cc) + traceClient, err := svc.Export(context.Background()) + if err != nil { + t.Fatalf("Failed to create the traceClient: %v", err) + } + + // Step 1) The intiation + initiatingNode := &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{ + Pid: 1, + HostName: "multiplexer", + }, + LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_JAVA}, + } + + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: initiatingNode}); err != nil { + t.Fatalf("Failed to send the initiating message: %v", err) + } + + // Step 1a) Send some spans without a node, they should be registered as coming from the initiating node. + sLi := []*tracepb.Span{{TraceId: []byte("1234567890abcde")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLi}); err != nil { + t.Fatalf("Failed to send the proxied message from app1: %v", err) + } + + // Step 2) Send a "proxied" trace message from app1 with "node1" + node1 := &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{Pid: 9489, HostName: "nodejs-host"}, + LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_NODE_JS}, + } + sL1 := []*tracepb.Span{{TraceId: []byte("abcdefghijklmno")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: node1, Spans: sL1}); err != nil { + t.Fatalf("Failed to send the proxied message from app1: %v", err) + } + + // Step 3) Send a trace message without a node but with spans: this + // should be registered as belonging to the last used node i.e. "node1". + sLn1 := []*tracepb.Span{{TraceId: []byte("ABCDEFGHIJKLMNO")}, {TraceId: []byte("1234567890abcde")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn1}); err != nil { + t.Fatalf("Failed to send the proxied message without a node: %v", err) + } + + // Step 4) Send a trace message from a differently proxied node "node2" from app2 + node2 := &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{Pid: 7752, HostName: "golang-host"}, + LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_GO_LANG}, + } + sL2 := []*tracepb.Span{{TraceId: []byte("_B_D_F_H_J_L_N_")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: node2, Spans: sL2}); err != nil { + t.Fatalf("Failed to send the proxied message from app2: %v", err) + } + + // Step 5a) Send a trace message without a node but with spans: this + // should be registered as belonging to the last used node i.e. "node2". + sLn2a := []*tracepb.Span{{TraceId: []byte("_BCDEFGHIJKLMN_")}, {TraceId: []byte("_234567890abcd_")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn2a}); err != nil { + t.Fatalf("Failed to send the proxied message without a node: %v", err) + } + + // Step 5b) + sLn2b := []*tracepb.Span{{TraceId: []byte("_xxxxxxxxxxxxx_")}, {TraceId: []byte("B234567890abcdA")}} + if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn2b}); err != nil { + t.Fatalf("Failed to send the proxied message without a node: %v", err) + } + // Give the process sometime to send data over the wire and perform batching + <-time.After(150 * time.Millisecond) + + // Examination time! + resultsMapping := make(map[string][]*tracepb.Span) + + spanSink.forEachEntry(func(node *commonpb.Node, spans []*tracepb.Span) { + resultsMapping[nodeToKey(node)] = spans + }) + + // First things first, we expect exactly 3 unique keys + // 1. Initiating Node + // 2. Node 1 + // 3. Node 2 + if g, w := len(resultsMapping), 3; g != w { + t.Errorf("Got %d keys in the results map; Wanted exactly %d\n\nResultsMapping: %+v\n", g, w, resultsMapping) + } + + // Want span counts + wantSpanCounts := map[string]int{ + nodeToKey(initiatingNode): 1, + nodeToKey(node1): 3, + nodeToKey(node2): 5, + } + for key, wantSpanCounts := range wantSpanCounts { + gotSpanCounts := len(resultsMapping[key]) + if gotSpanCounts != wantSpanCounts { + t.Errorf("Key=%q gotSpanCounts %d wantSpanCounts %d", key, gotSpanCounts, wantSpanCounts) + } + } + + // Now ensure that the exported spans match up exactly with + // the nodes and the last seen node expectation/behavior. + // (or at least their serialized equivalents match up) + wantContents := map[string][]*tracepb.Span{ + nodeToKey(initiatingNode): sLi, + nodeToKey(node1): append(sL1, sLn1...), + nodeToKey(node2): append(sL2, append(sLn2a, sLn2b...)...), + } + + gotBlob, _ := json.Marshal(resultsMapping) + wantBlob, _ := json.Marshal(wantContents) + if !bytes.Equal(gotBlob, wantBlob) { + t.Errorf("Unequal serialization results\nGot:\n\t%s\nWant:\n\t%s\n", gotBlob, wantBlob) + } +} + +func nodeToKey(n *commonpb.Node) string { + blob, _ := proto.Marshal(n) + return string(blob) +} + type spanAppender struct { sync.RWMutex spansPerNode map[*commonpb.Node][]*tracepb.Span