Skip to content

Commit 9e7fa05

Browse files
committed
Add more opencensus tracing in query and commit endpoints.
1 parent dada74f commit 9e7fa05

File tree

5 files changed

+56
-63
lines changed

5 files changed

+56
-63
lines changed

edgraph/server.go

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"bytes"
2121
"fmt"
2222
"math"
23-
"math/rand"
2423
"os"
2524
"strings"
2625
"sync"
@@ -326,15 +325,16 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
326325
return empty, err
327326
}
328327

328+
func annotateStartTs(span *otrace.Span, ts uint64) {
329+
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(ts))}, "")
330+
}
331+
329332
func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assigned, err error) {
330333
ctx, span := otrace.StartSpan(ctx, "Server.Mutate")
331334
defer span.End()
332335

333336
resp = &api.Assigned{}
334337
if err := x.HealthCheck(); err != nil {
335-
if tr, ok := trace.FromContext(ctx); ok {
336-
tr.LazyPrintf("Request rejected %v", err)
337-
}
338338
return resp, err
339339
}
340340

@@ -344,18 +344,14 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
344344
if mu.StartTs == 0 {
345345
mu.StartTs = State.getTimestamp(false)
346346
}
347+
annotateStartTs(span, mu.StartTs)
347348
emptyMutation :=
348349
len(mu.GetSetJson()) == 0 && len(mu.GetDeleteJson()) == 0 &&
349350
len(mu.Set) == 0 && len(mu.Del) == 0 &&
350351
len(mu.SetNquads) == 0 && len(mu.DelNquads) == 0
351352
if emptyMutation {
352353
return resp, fmt.Errorf("empty mutation")
353354
}
354-
if rand.Float64() < worker.Config.Tracing {
355-
var tr trace.Trace
356-
tr, ctx = x.NewTrace("Server.Mutate", ctx)
357-
defer tr.Finish()
358-
}
359355

360356
var l query.Latency
361357
l.Start = time.Now()
@@ -387,6 +383,7 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
387383
Edges: edges,
388384
StartTs: mu.StartTs,
389385
}
386+
span.Annotate(nil, "Applying mutations")
390387
resp.Context, err = query.ApplyMutations(ctx, m)
391388
if !mu.CommitNow {
392389
if err == y.ErrConflict {
@@ -412,16 +409,11 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
412409
}
413410
return resp, err
414411
}
415-
tr, ok := trace.FromContext(ctx)
416-
if ok {
417-
tr.LazyPrintf("Prewrites err: %v. Attempting to commit/abort immediately.", err)
418-
}
412+
span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err)
419413
ctxn := resp.Context
420414
// zero would assign the CommitTs
421415
cts, err := worker.CommitOverNetwork(ctx, ctxn)
422-
if ok {
423-
tr.LazyPrintf("Status of commit at ts: %d: %v", ctxn.StartTs, err)
424-
}
416+
span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err)
425417
if err != nil {
426418
if err == y.ErrAborted {
427419
err = status.Errorf(codes.Aborted, err.Error())
@@ -441,6 +433,9 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
441433
if glog.V(3) {
442434
glog.Infof("Got a query: %+v", req)
443435
}
436+
ctx, span := otrace.StartSpan(ctx, "Server.Query")
437+
defer span.End()
438+
444439
if err := x.HealthCheck(); err != nil {
445440
if tr, ok := trace.FromContext(ctx); ok {
446441
tr.LazyPrintf("Request rejected %v", err)
@@ -455,25 +450,15 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
455450
return resp, ctx.Err()
456451
}
457452

458-
if rand.Float64() < worker.Config.Tracing {
459-
var tr trace.Trace
460-
tr, ctx = x.NewTrace("GrpcQuery", ctx)
461-
defer tr.Finish()
462-
}
463-
464453
resp = new(api.Response)
465454
if len(req.Query) == 0 {
466-
if tr, ok := trace.FromContext(ctx); ok {
467-
tr.LazyPrintf("Empty query")
468-
}
455+
span.Annotate(nil, "Empty query")
469456
return resp, fmt.Errorf("empty query")
470457
}
471458

472459
var l query.Latency
473460
l.Start = time.Now()
474-
if tr, ok := trace.FromContext(ctx); ok {
475-
tr.LazyPrintf("Query request received: %v", req)
476-
}
461+
span.Annotatef(nil, "Query received: %v", req)
477462

478463
parsedReq, err := gql.Parse(gql.Request{
479464
Str: req.Query,
@@ -489,6 +474,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
489474
resp.Txn = &api.TxnContext{
490475
StartTs: req.StartTs,
491476
}
477+
annotateStartTs(span, req.StartTs)
492478

493479
var queryRequest = query.QueryRequest{
494480
Latency: &l,
@@ -499,18 +485,12 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
499485
// Core processing happens here.
500486
var er query.ExecuteResult
501487
if er, err = queryRequest.Process(ctx); err != nil {
502-
if tr, ok := trace.FromContext(ctx); ok {
503-
tr.LazyPrintf("Error while processing query: %+v", err)
504-
}
505488
return resp, x.Wrap(err)
506489
}
507490
resp.Schema = er.SchemaNode
508491

509492
json, err := query.ToJson(&l, er.Subgraphs)
510493
if err != nil {
511-
if tr, ok := trace.FromContext(ctx); ok {
512-
tr.LazyPrintf("Error while converting to protocol buffer: %+v", err)
513-
}
514494
return resp, err
515495
}
516496
resp.Json = json
@@ -525,8 +505,10 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
525505
return resp, err
526506
}
527507

528-
func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext,
529-
error) {
508+
func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) {
509+
ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort")
510+
defer span.End()
511+
530512
if err := x.HealthCheck(); err != nil {
531513
if tr, ok := trace.FromContext(ctx); ok {
532514
tr.LazyPrintf("Request rejected %v", err)
@@ -535,10 +517,10 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx
535517
}
536518

537519
tctx := &api.TxnContext{}
538-
539520
if tc.StartTs == 0 {
540521
return &api.TxnContext{}, fmt.Errorf("StartTs cannot be zero while committing a transaction.")
541522
}
523+
annotateStartTs(span, tc.StartTs)
542524
commitTs, err := worker.CommitOverNetwork(ctx, tc)
543525
if err == y.ErrAborted {
544526
tctx.Aborted = true

query/query.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"github.com/golang/glog"
29+
otrace "go.opencensus.io/trace"
2930
"golang.org/x/net/trace"
3031
"google.golang.org/grpc/metadata"
3132

@@ -1838,6 +1839,9 @@ func getReversePredicates(ctx context.Context) ([]string, error) {
18381839
// ProcessGraph processes the SubGraph instance accumulating result for the query
18391840
// from different instances. Note: taskQuery is nil for root node.
18401841
func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
1842+
ctx, span := otrace.StartSpan(ctx, "query.ProcessGraph")
1843+
defer span.End()
1844+
18411845
if sg.Attr == "uid" {
18421846
// We dont need to call ProcessGraph for uid, as we already have uids
18431847
// populated from parent and there is nothing to process but uidMatrix
@@ -2432,6 +2436,9 @@ type QueryRequest struct {
24322436
// Fills Subgraphs and Vars.
24332437
// It optionally also returns a map of the allocated uids in case of an upsert request.
24342438
func (req *QueryRequest) ProcessQuery(ctx context.Context) (err error) {
2439+
ctx, span := otrace.StartSpan(ctx, "query.ProcessQuery")
2440+
defer span.End()
2441+
24352442
// doneVars stores the processed variables.
24362443
req.vars = make(map[string]varValue)
24372444
loopStart := time.Now()
@@ -2441,11 +2448,7 @@ func (req *QueryRequest) ProcessQuery(ctx context.Context) (err error) {
24412448

24422449
if gq == nil || (len(gq.UID) == 0 && gq.Func == nil && len(gq.NeedsVar) == 0 &&
24432450
gq.Alias != "shortest" && !gq.IsEmpty) {
2444-
err := x.Errorf("Invalid query, query pb.id is zero and generator is nil")
2445-
if tr, ok := trace.FromContext(ctx); ok {
2446-
tr.LazyPrintf(err.Error())
2447-
}
2448-
return err
2451+
return x.Errorf("Invalid query, query pb.id is zero and generator is nil")
24492452
}
24502453
sg, err := ToSubGraph(ctx, gq)
24512454
if err != nil {
@@ -2454,9 +2457,7 @@ func (req *QueryRequest) ProcessQuery(ctx context.Context) (err error) {
24542457
sg.recurse(func(sg *SubGraph) {
24552458
sg.ReadTs = req.ReadTs
24562459
})
2457-
if tr, ok := trace.FromContext(ctx); ok {
2458-
tr.LazyPrintf("Query parsed")
2459-
}
2460+
span.Annotate(nil, "Query parsed")
24602461
req.Subgraphs = append(req.Subgraphs, sg)
24612462
}
24622463
req.Latency.Parsing += time.Since(loopStart)

worker/mutation.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uin
100100
if err := runSchemaMutationHelper(ctx, update, startTs); err != nil {
101101
// on error, we restore the memory state to be the same as the disk
102102
maxRetries := 10
103-
loadErr := x.RetryUntilSuccess(maxRetries, 10 * time.Millisecond, func() error {
103+
loadErr := x.RetryUntilSuccess(maxRetries, 10*time.Millisecond, func() error {
104104
return schema.Load(update.Predicate)
105105
})
106106

@@ -553,12 +553,21 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e
553553

554554
// CommitOverNetwork makes a proxy call to Zero to commit or abort a transaction.
555555
func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) {
556+
ctx, span := otrace.StartSpan(ctx, "worker.CommitOverNetwork")
557+
defer span.End()
558+
556559
pl := groups().Leader(0)
557560
if pl == nil {
558561
return 0, conn.ErrNoConnection
559562
}
560563
zc := pb.NewZeroClient(pl.Get())
561564
tctx, err := zc.CommitOrAbort(ctx, tc)
565+
566+
var attributes []otrace.Attribute
567+
attributes = append(attributes, otrace.Int64Attribute("commitTs", int64(tctx.CommitTs)))
568+
attributes = append(attributes, otrace.BoolAttribute("committed", tctx.CommitTs > 0))
569+
span.Annotatef(attributes, "Error=%v", err)
570+
562571
if err != nil {
563572
return 0, err
564573
}

worker/schema.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package worker
1818

1919
import (
20+
otrace "go.opencensus.io/trace"
2021
"golang.org/x/net/context"
2122
"golang.org/x/net/trace"
2223

@@ -41,6 +42,9 @@ type resultErr struct {
4142
// predicates is not specified, then all the predicates belonging to the group
4243
// are returned
4344
func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, error) {
45+
ctx, span := otrace.StartSpan(ctx, "worker.getSchema")
46+
defer span.End()
47+
4448
var result pb.SchemaResult
4549
var predicates []string
4650
var fields []string
@@ -162,6 +166,9 @@ func getSchemaOverNetwork(ctx context.Context, gid uint32, s *pb.SchemaRequest,
162166
// GetSchemaOverNetwork checks which group should be serving the schema
163167
// according to fingerprint of the predicate and sends it to that instance.
164168
func GetSchemaOverNetwork(ctx context.Context, schema *pb.SchemaRequest) ([]*api.SchemaNode, error) {
169+
ctx, span := otrace.StartSpan(ctx, "worker.GetSchemaOverNetwork")
170+
defer span.End()
171+
165172
if err := x.HealthCheck(); err != nil {
166173
if tr, ok := trace.FromContext(ctx); ok {
167174
tr.LazyPrintf("Request rejected %v", err)

worker/task.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"bytes"
2121
"errors"
2222
"fmt"
23-
"math/rand"
2423
"sort"
2524
"strconv"
2625
"strings"
@@ -39,6 +38,7 @@ import (
3938
"github.com/dgraph-io/dgraph/types/facets"
4039
"github.com/dgraph-io/dgraph/x"
4140
"github.com/golang/glog"
41+
otrace "go.opencensus.io/trace"
4242

4343
cindex "github.com/google/codesearch/index"
4444
cregexp "github.com/google/codesearch/regexp"
@@ -149,22 +149,14 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error
149149
}
150150

151151
result, err := processWithBackupRequest(ctx, gid, func(ctx context.Context, c pb.WorkerClient) (interface{}, error) {
152-
if tr, ok := trace.FromContext(ctx); ok {
153-
id := fmt.Sprintf("%d", rand.Int())
154-
tr.LazyPrintf("Sending request to server, id: %s", id)
155-
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("trace", id))
156-
}
157152
return c.ServeTask(ctx, q)
158153
})
159154
if err != nil {
160-
if tr, ok := trace.FromContext(ctx); ok {
161-
tr.LazyPrintf("Error while worker.ServeTask: %v", err)
162-
}
163155
return nil, err
164156
}
165157
reply := result.(*pb.Result)
166-
if tr, ok := trace.FromContext(ctx); ok {
167-
tr.LazyPrintf("Reply from server. length: %v Group: %v Attr: %v", len(reply.UidMatrix), gid, attr)
158+
if span := otrace.FromContext(ctx); span != nil {
159+
span.Annotatef(nil, "Reply from server. length: %v Group: %v Attr: %v", len(reply.UidMatrix), gid, attr)
168160
}
169161
return reply, nil
170162
}
@@ -599,11 +591,13 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti
599591

600592
// processTask processes the query, accumulates and returns the result.
601593
func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) {
594+
span := otrace.FromContext(ctx)
595+
span.Annotate(nil, "Waiting for startTs")
602596
if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil {
603597
return &emptyResult, err
604598
}
605-
if tr, ok := trace.FromContext(ctx); ok {
606-
tr.LazyPrintf("Done waiting for maxPending to catch up for Attr %q, readTs: %d\n",
599+
if span != nil {
600+
span.Annotatef(nil, "Done waiting for maxPending to catch up for Attr %q, readTs: %d\n",
607601
q.Attr, q.ReadTs)
608602
}
609603

@@ -1318,6 +1312,9 @@ func parseSrcFn(q *pb.Query) (*functionContext, error) {
13181312

13191313
// ServeTask is used to respond to a query.
13201314
func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, error) {
1315+
ctx, span := otrace.StartSpan(ctx, "worker.ServeTask")
1316+
defer span.End()
1317+
13211318
if ctx.Err() != nil {
13221319
return &emptyResult, ctx.Err()
13231320
}
@@ -1327,12 +1324,9 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er
13271324
if q.UidList != nil {
13281325
numUids = len(q.UidList.Uids)
13291326
}
1330-
if tr, ok := trace.FromContext(ctx); ok {
1331-
tr.LazyPrintf("Attribute: %q NumUids: %v groupId: %v ServeTask", q.Attr, numUids, gid)
1332-
}
1327+
span.Annotatef(nil, "Attribute: %q NumUids: %v groupId: %v ServeTask", q.Attr, numUids, gid)
13331328

13341329
if !groups().ServesGroup(gid) {
1335-
// TODO(pawan) - Log this when we have debug logs.
13361330
return nil, fmt.Errorf("Temporary error, attr: %q groupId: %v Request sent to wrong server",
13371331
q.Attr, gid)
13381332
}

0 commit comments

Comments
 (0)