Skip to content

Commit

Permalink
feat: use gRPC metadata to pass additional info between client/server
Browse files Browse the repository at this point in the history
  • Loading branch information
moul committed Jan 4, 2019
1 parent 206163e commit d293e8f
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 122 deletions.
192 changes: 96 additions & 96 deletions core/api/client/jsonclient/berty.node.service.gen.go

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions core/api/client/jsonclient/berty.p2p.service.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions core/api/client/jsonclient/client.go
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"strings"

"google.golang.org/grpc/metadata"

"berty.tech/core/api/client"
)

type unaryCallback func(*client.Client, context.Context, []byte) (interface{}, error)
type unaryCallback func(*client.Client, context.Context, []byte) (interface{}, metadata.MD, metadata.MD, error)

var unaryMap map[string]unaryCallback

Expand All @@ -19,16 +21,17 @@ func registerUnary(name string, endpoint unaryCallback) {
unaryMap[name] = endpoint
}

func CallUnary(ctx context.Context, c *client.Client, endpoint string, jsonInput []byte) (interface{}, error) {
func CallUnary(ctx context.Context, c *client.Client, endpoint string, jsonInput []byte) (interface{}, metadata.MD, metadata.MD, error) {
if jsonInput == nil {
jsonInput = []byte("{}")
}

for name, handler := range unaryMap {
if strings.ToLower(name) == strings.ToLower(endpoint) {
return handler(c, ctx, jsonInput)
}
}
return nil, fmt.Errorf("unknown endpoint: %q", endpoint)
return nil, nil, nil, fmt.Errorf("unknown endpoint: %q", endpoint)
}

func Unaries() []string {
Expand Down
Expand Up @@ -12,6 +12,7 @@ import (
"{{regexReplaceAll "\"([^;]*).*\"" ($dependency.GoPkg|string) "${1}" }}"
{{ end -}}
"berty.tech/core/api/client"
"google.golang.org/grpc"
)

{{- range $svc := .File.Service}}
Expand All @@ -30,7 +31,7 @@ import (

{{ range $method := .Method}}
{{if and (not .ClientStreaming) (not .ServerStreaming)}}
func {{$file.Package|splitList "."|last|title}}{{$method.Name}}(client *client.Client, ctx context.Context, jsonInput []byte) (interface{}, error) {
func {{$file.Package|splitList "."|last|title}}{{$method.Name}}(client *client.Client, ctx context.Context, jsonInput []byte) (interface{}, metadata.MD, metadata.MD, error) {
tracer := tracing.EnterFunc(ctx, string(jsonInput))
defer tracer.Finish()
ctx = tracer.Context()
Expand All @@ -40,7 +41,7 @@ import (
{{ $in := $method.InputType | getMessageType $file }}
var typedInput {{$in.GoType "."}}
if err := json.Unmarshal(jsonInput, &typedInput); err != nil {
return nil, err
return nil, nil, nil, err
}

var header, trailer metadata.MD
Expand All @@ -54,7 +55,7 @@ import (
tracer.SetAnyField("header", header)
tracer.SetAnyField("trailer", trailer)

return ret, err
return ret, header, trailer, err
}
{{- end}}
{{if and (not .ClientStreaming) (.ServerStreaming)}}
Expand Down
2 changes: 1 addition & 1 deletion core/api/p2p/event.go
Expand Up @@ -70,7 +70,7 @@ func (e Event) ToJSON() string {

func (e Event) CreateSpan(ctx context.Context) (opentracing.Span, context.Context) {
tracer := opentracing.GlobalTracer()
caller := tracing.GetCallerName("call", 1)
caller := tracing.GetCallerName(1)
topic := fmt.Sprintf("event::/%s/%s", e.Direction.String(), e.Kind.String())

// Retrieve span context inside the event if needed
Expand Down
11 changes: 10 additions & 1 deletion core/cmd/berty/client.go
Expand Up @@ -241,10 +241,19 @@ func clientUnary(opts *clientOptions) error {
input = []byte(opts.args[0])
}

ret, err := jsonclient.CallUnary(ctx, client, opts.endpoint, input)
ret, _, trailer, err := jsonclient.CallUnary(ctx, client, opts.endpoint, input)
if err != nil {
return errors.Wrap(err, "client error")
}
if len(trailer) > 0 {
fmt.Fprint(os.Stderr, "gRPC trailer:\n")
for key, values := range trailer {
fmt.Fprintf(os.Stderr, " %s: (%d values)\n", key, len(values))
for _, value := range values {
fmt.Fprintf(os.Stderr, " - %s\n", value)
}
}
}

jsonPrint(ret, opts)
return nil
Expand Down
2 changes: 2 additions & 0 deletions core/go.mod
Expand Up @@ -29,6 +29,7 @@ require (
github.com/gopherjs/gopherjs v0.0.0-20181004151105-1babbf986f6f // indirect
github.com/gopherjs/gopherwasm v1.0.1 // indirect
github.com/gorilla/websocket v1.4.0
github.com/gosimple/slug v1.4.2
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20181112102510-3304cc886352
github.com/gxed/GoEndian v0.0.0-20160916112711-0f5c6873267e // indirect
github.com/gxed/eventfd v0.0.0-20160916113412-80a92cca79a8 // indirect
Expand Down Expand Up @@ -111,6 +112,7 @@ require (
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/pkg/errors v0.8.0
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be // indirect
github.com/rs/cors v1.6.0
github.com/sergi/go-diff v1.0.0 // indirect
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d // indirect
Expand Down
4 changes: 4 additions & 0 deletions core/go.sum
Expand Up @@ -237,6 +237,8 @@ github.com/gorilla/sessions v1.1.2/go.mod h1:8KCfur6+4Mqcc6S0FEfKuN15Vl5MgXW92AE
github.com/gorilla/sessions v1.1.3/go.mod h1:8KCfur6+4Mqcc6S0FEfKuN15Vl5MgXW92AE8ovaJD0w=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gosimple/slug v1.4.2 h1:jDmprx3q/9Lfk4FkGZtvzDQ9Cj9eAmsjzeQGp24PeiQ=
github.com/gosimple/slug v1.4.2/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20181112102510-3304cc886352 h1:1GXyC+LmruB9uk340NwSS6UPwWNXM3xe6pbGpsXqAKc=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20181112102510-3304cc886352/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/gxed/GoEndian v0.0.0-20160916112711-0f5c6873267e h1:eIhARPSF2zPr1hKxiL81XWQ392f5stEEcs38UzZVSWo=
Expand Down Expand Up @@ -473,6 +475,8 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be h1:ta7tUOvsPHVHGom5hKW5VXNc2xZIkfCKP8iaqOyYtUQ=
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be/go.mod h1:MIDFMn7db1kT65GmV94GzpX9Qdi7N/pQlwb+AN8wh+Q=
github.com/rogpeppe/go-internal v1.0.0 h1:o4VLZ5jqHE+HahLT6drNtSGTrrUA3wPBmtpgqtdbClo=
github.com/rogpeppe/go-internal v1.0.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/cors v1.6.0 h1:G9tHG9lebljV9mfp9SNPDL36nCDxmo3zTlAf1YgvzmI=
Expand Down
8 changes: 4 additions & 4 deletions core/node/nodeapi_devtools.go
Expand Up @@ -141,14 +141,14 @@ func (n *Node) GenerateFakeData(ctx context.Context, input *node.Void) (*node.Vo

/*
// enqueue fake incoming event
in := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset)
if err := n.EnqueueClientEvent(in); err != nil {
in := n.NewContactEvent(ctx, &entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset)
if err := n.EnqueueClientEvent(ctx, in); err != nil {
return nil, err
}
// enqueue fake outgoing event
out := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset)
if err := n.EnqueueOutgoingEvent(out); err != nil {
out := n.NewContactEvent(ctx, &entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset)
if err := n.EnqueueOutgoingEvent(ctx, out); err != nil {
return nil, err
}
*/
Expand Down
2 changes: 2 additions & 0 deletions core/node/p2pclient.go
Expand Up @@ -45,6 +45,8 @@ func (n *Node) EnqueueOutgoingEvent(ctx context.Context, event *p2p.Event) error
}
n.outgoingEvents <- event

tracer.SetMetadata("new-outgoing-event", event.ID)

return nil
}

Expand Down
31 changes: 27 additions & 4 deletions core/pkg/tracing/functracer.go
Expand Up @@ -6,9 +6,12 @@ import (
"fmt"
"time"

"github.com/gosimple/slug"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// fixme: make debug configurable
Expand All @@ -18,11 +21,30 @@ type FuncTracer struct {
span opentracing.Span
logger *zap.Logger
startTime time.Time
caller string
}

func (t *FuncTracer) Finish() {
t.span.Finish()
t.logger.Debug("func leave", zap.Duration("duration", time.Since(t.startTime)))
t.SetMetadata("duration", fmt.Sprintf("%s", time.Since(t.startTime)))
}

func (t *FuncTracer) SetMetadata(key string, value string) {
// check if is gRPC context
if grpc.ServerTransportStreamFromContext(t.ctx) == nil {
return
}

if err := grpc.SetTrailer(
t.ctx,
metadata.Pairs(
slug.Make(fmt.Sprintf("%s %s", t.caller, key)),
value,
),
); err != nil {
logger().Warn("failed to set grpc trailer on context", zap.Error(err))
}
}

func (t *FuncTracer) SetTag(key, value string) {
Expand Down Expand Up @@ -54,17 +76,18 @@ func (t *FuncTracer) Context() context.Context {
func EnterFunc(ctx context.Context, args ...interface{}) *FuncTracer {
t := &FuncTracer{
startTime: time.Now(),
caller: GetCallerName(1),
}

topic := GetCallerName("call", 1)
t.logger = zap.L().Named("vendor.tracing").With(zap.String("topic", topic))
callTopic := fmt.Sprintf("call::%s", t.caller)
t.logger = zap.L().Named("vendor.tracing").With(zap.String("topic", callTopic))

if ctx == nil {
ctx = context.Background()
t.logger.Warn("context is not set")
}

t.span, t.ctx = opentracing.StartSpanFromContext(ctx, topic)
t.span, t.ctx = opentracing.StartSpanFromContext(ctx, callTopic)
// t.SetMetadata("spanid", t.span...)

if len(args) > 0 {
for idx, arg := range args {
Expand Down
7 changes: 3 additions & 4 deletions core/pkg/tracing/tracing.go
Expand Up @@ -2,7 +2,6 @@ package tracing

import (
"context"
"fmt"
"runtime"
"strings"

Expand All @@ -16,11 +15,11 @@ type Tracer interface {
SetAnyField(key string, value interface{})
Span() opentracing.Span
Context() context.Context
SetMetadata(key, value string)
}

func GetCallerName(prefix string, skip int) string {
func GetCallerName(skip int) string {
function, _, _, _ := runtime.Caller(skip + 1)
funcName := runtime.FuncForPC(function).Name()
caller := strings.Replace(funcName, "berty.tech/core/", "./", 1)
return fmt.Sprintf("%s::%s()", prefix, caller)
return strings.Replace(funcName, "berty.tech/core/", "./", 1)
}

0 comments on commit d293e8f

Please sign in to comment.