Skip to content

Commit

Permalink
interceptor/opencensus: support multiplexing in Export
Browse files Browse the repository at this point in the history
Add multiplexing/"pass through" mode where spans from
sources other than the initiating node can be sent
and they should be properly proxied to their final
destination. If the currently received node is nil,
use the previously received and non-nil node, and after
processing spans, memoize the last received node.
Updated the tests to lock-in this behavior.

This change also adds in tests to ensure this behavior
and to avoid regressions in the future.

Fixes census-instrumentation#43
  • Loading branch information
odeke-em committed Oct 2, 2018
1 parent 8897b9f commit c960396
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 15 deletions.
45 changes: 31 additions & 14 deletions interceptor/opencensus/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,40 +53,38 @@ func (oci *OCInterceptor) Config(tcs agenttracepb.TraceService_ConfigServer) err
return errUnimplemented
}

type spansAndNode struct {
spans []*tracepb.Span
node *commonpb.Node
}

// Export is the gRPC method that receives streamed traces from
// OpenCensus-traceproto compatible libraries/applications.
func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) error {
// Firstly we MUST receive the node identifier to initiate
// the service and start accepting exported spans.
const maxTraceInitRetries = 15 // Arbitrary value

var node *commonpb.Node
var initiatingNode *commonpb.Node
for i := 0; i < maxTraceInitRetries; i++ {
recv, err := tes.Recv()
if err != nil {
return err
}

if nd := recv.Node; nd != nil {
node = nd
initiatingNode = nd
break
}
}

if node == nil {
return fmt.Errorf("failed to receive a non-nil Node even after %d retries", maxTraceInitRetries)
if initiatingNode == nil {
return fmt.Errorf("failed to receive a non-nil initiating Node even after %d retries", maxTraceInitRetries)
}

// Now that we've got the node, we can start to receive streamed up spans.
// The bundler will receive batches of spans i.e. []*tracepb.Span
traceBundler := bundler.NewBundler(([]*tracepb.Span)(nil), func(payload interface{}) {
listOfSpanLists := payload.([][]*tracepb.Span)
flattenedListOfSpans := make([]*tracepb.Span, 0, len(listOfSpanLists)) // In the best case, each spanList has 1 span
for _, listOfSpans := range listOfSpanLists {
flattenedListOfSpans = append(flattenedListOfSpans, listOfSpans...)
}
oci.spanSink.ReceiveSpans(node, flattenedListOfSpans...)
})
traceBundler := bundler.NewBundler((*spansAndNode)(nil), oci.batchSpanExporting)
spanBufferPeriod := oci.spanBufferPeriod
if spanBufferPeriod <= 0 {
spanBufferPeriod = 2 * time.Second // Arbitrary value
Expand All @@ -100,13 +98,32 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
traceBundler.DelayThreshold = spanBufferPeriod
traceBundler.BundleCountThreshold = spanBufferCount

var lastNonNilNode *commonpb.Node = initiatingNode

for {
recv, err := tes.Recv()
if err != nil {
return err
}

// Otherwise add these spans to the bundler
traceBundler.Add(recv.Spans, len(recv.Spans))
// If a Node has been sent from downstream, save and use it.
node := recv.Node
if node != nil {
lastNonNilNode = node
}

// Otherwise add them to the bundler.
bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
}
}

func (oci *OCInterceptor) batchSpanExporting(payload interface{}) {
spnL := payload.([]*spansAndNode)
// TODO: (@odeke-em) investigate if it is necessary
// to group nodes with their respective spans during
// spansAndNode list unfurling then send spans grouped per node
for _, spn := range spnL {
oci.spanSink.ReceiveSpans(spn.node, spn.spans...)
}
}
140 changes: 139 additions & 1 deletion interceptor/opencensus/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package ocinterceptor_test

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net"
"reflect"
"strconv"
Expand All @@ -25,6 +28,7 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/grpc"

Expand Down Expand Up @@ -154,7 +158,141 @@ func TestOCInterceptor_endToEnd(t *testing.T) {
}
}

// Helper functions from here on down.
// Issue #43. Export should support node multiplexing.
// The goal is to ensure that OCInterceptor can always support
// a passthrough mode where it initiates Export normally by firstly
// receiving the initiator node. However ti should still be able to
// accept nodes from downstream sources, but if a node isn't specified in
// an exportTrace request, assume it is from the last received and non-nil node.
func TestExportMultiplexing(t *testing.T) {
spanSink := newSpanAppender()

_, port, doneFn := ocInterceptorOnGRPCServer(t, spanSink, ocinterceptor.WithSpanBufferPeriod(90*time.Millisecond))
defer doneFn()

addr := fmt.Sprintf(":%d", port)
cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Fatalf("Failed to create the gRPC client connection: %v", err)
}
defer cc.Close()

svc := agenttracepb.NewTraceServiceClient(cc)
traceClient, err := svc.Export(context.Background())
if err != nil {
t.Fatalf("Failed to create the traceClient: %v", err)
}

// Step 1) The intiation
initiatingNode := &commonpb.Node{
Identifier: &commonpb.ProcessIdentifier{
Pid: 1,
HostName: "multiplexer",
},
LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_JAVA},
}

if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: initiatingNode}); err != nil {
t.Fatalf("Failed to send the initiating message: %v", err)
}

// Step 1a) Send some spans without a node, they should be registered as coming from the initiating node.
sLi := []*tracepb.Span{{TraceId: []byte("1234567890abcde")}}
if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLi}); err != nil {
t.Fatalf("Failed to send the proxied message from app1: %v", err)
}

// Step 2) Send a "proxied" trace message from app1 with "node1"
node1 := &commonpb.Node{
Identifier: &commonpb.ProcessIdentifier{Pid: 9489, HostName: "nodejs-host"},
LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_NODE_JS},
}
sL1 := []*tracepb.Span{{TraceId: []byte("abcdefghijklmno")}}
if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: node1, Spans: sL1}); err != nil {
t.Fatalf("Failed to send the proxied message from app1: %v", err)
}

// Step 3) Send a trace message without a node but with spans: this
// should be registered as belonging to the last used node i.e. "node1".
sLn1 := []*tracepb.Span{{TraceId: []byte("ABCDEFGHIJKLMNO")}, {TraceId: []byte("1234567890abcde")}}
if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn1}); err != nil {
t.Fatalf("Failed to send the proxied message without a node: %v", err)
}

// Step 4) Send a trace message from a differently proxied node "node2" from app2
node2 := &commonpb.Node{
Identifier: &commonpb.ProcessIdentifier{Pid: 7752, HostName: "golang-host"},
LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_GO_LANG},
}
sL2 := []*tracepb.Span{{TraceId: []byte("_B_D_F_H_J_L_N_")}}
if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: node2, Spans: sL2}); err != nil {
t.Fatalf("Failed to send the proxied message from app2: %v", err)
}

// Step 5a) Send a trace message without a node but with spans: this
// should be registered as belonging to the last used node i.e. "node2".
sLn2a := []*tracepb.Span{{TraceId: []byte("_BCDEFGHIJKLMN_")}, {TraceId: []byte("_234567890abcd_")}}
if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn2a}); err != nil {
t.Fatalf("Failed to send the proxied message without a node: %v", err)
}

// Step 5b)
sLn2b := []*tracepb.Span{{TraceId: []byte("_xxxxxxxxxxxxx_")}, {TraceId: []byte("B234567890abcdA")}}
if err := traceClient.Send(&agenttracepb.ExportTraceServiceRequest{Node: nil, Spans: sLn2b}); err != nil {
t.Fatalf("Failed to send the proxied message without a node: %v", err)
}
// Give the process sometime to send data over the wire and perform batching
<-time.After(150 * time.Millisecond)

// Examination time!
resultsMapping := make(map[string][]*tracepb.Span)

spanSink.forEachEntry(func(node *commonpb.Node, spans []*tracepb.Span) {
resultsMapping[nodeToKey(node)] = spans
})

// First things first, we expect exactly 3 unique keys
// 1. Initiating Node
// 2. Node 1
// 3. Node 2
if g, w := len(resultsMapping), 3; g != w {
t.Errorf("Got %d keys in the results map; Wanted exactly %d\n\nResultsMapping: %+v\n", g, w, resultsMapping)
}

// Want span counts
wantSpanCounts := map[string]int{
nodeToKey(initiatingNode): 1,
nodeToKey(node1): 3,
nodeToKey(node2): 5,
}
for key, wantSpanCounts := range wantSpanCounts {
gotSpanCounts := len(resultsMapping[key])
if gotSpanCounts != wantSpanCounts {
t.Errorf("Key=%q gotSpanCounts %d wantSpanCounts %d", key, gotSpanCounts, wantSpanCounts)
}
}

// Now ensure that the exported spans match up exactly with
// the nodes and the last seen node expectation/behavior.
// (or at least their serialized equivalents match up)
wantContents := map[string][]*tracepb.Span{
nodeToKey(initiatingNode): sLi,
nodeToKey(node1): append(sL1, sLn1...),
nodeToKey(node2): append(sL2, append(sLn2a, sLn2b...)...),
}

gotBlob, _ := json.Marshal(resultsMapping)
wantBlob, _ := json.Marshal(wantContents)
if !bytes.Equal(gotBlob, wantBlob) {
t.Errorf("Unequal serialization results\nGot:\n\t%s\nWant:\n\t%s\n", gotBlob, wantBlob)
}
}

func nodeToKey(n *commonpb.Node) string {
blob, _ := proto.Marshal(n)
return string(blob)
}

type spanAppender struct {
sync.RWMutex
spansPerNode map[*commonpb.Node][]*tracepb.Span
Expand Down

0 comments on commit c960396

Please sign in to comment.