Skip to content

Commit

Permalink
Merge 130d14d into f78d5a3
Browse files Browse the repository at this point in the history
  • Loading branch information
lanzafame committed Oct 2, 2018
2 parents f78d5a3 + 130d14d commit 128757e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.18: QmXyteEWrYHVJFEA8oX9cSfRp6PJ2kiVsmsFqPMi9ue1Ek
1.0.21: QmTZDiQmRnCqHQKfr7mcHu8w7YBqmmi2dGXfexrtLiYXxh
59 changes: 59 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package rpc

import (
"github.com/gxed/opencensus-go/stats"
"github.com/gxed/opencensus-go/stats/view"
"github.com/gxed/opencensus-go/tag"
)

var (
// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go)
latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)

bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
)

// opencensus keys, equivalent to prometheus tags
var (
// ServiceKey is a metrics tag for which RPC service was hit.
ServiceKey = makeKey("service")
// MethodKey is a metrics tag for which RPC method was hit.
MethodKey = makeKey("method")
)

// opencensus metrics
var (
// RequestCountMetric is the number times a RPC request has been made.
RequestCountMetric = stats.Int64("libp2p_gorpc/request_count", "Number of requests", stats.UnitDimensionless)
// RequestLatencyMetric is how long a RPC request took to complete.
RequestLatencyMetric = stats.Float64("libp2p_gorpc/request_latency", "Latency of RPC request", stats.UnitMilliseconds)
)

// opencensus views, which is just the aggregation of the metrics
var (
RequestCountView = &view.View{
Measure: RequestCountMetric,
TagKeys: []tag.Key{ServiceKey, MethodKey},
Aggregation: view.Sum(),
}

RequestLatencyView = &view.View{
Name: "libp2p_gorpc/request_latency",
Measure: RequestLatencyMetric,
TagKeys: []tag.Key{ServiceKey, MethodKey},
Aggregation: latencyDistribution,
}

DefaultViews = []*view.View{
RequestCountView,
RequestLatencyView,
}
)

func makeKey(name string) tag.Key {
key, err := tag.NewKey(name)
if err != nil {
logger.Fatal(err)
}
return key
}
8 changes: 7 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
"hash": "QmewJ1Zp9Hwz5HcMd7JYjhLXwvEHTL2UBCCz3oLt1E2N5z",
"name": "go-multicodec",
"version": "0.1.6"
},
{
"author": "lanzafame",
"hash": "QmXVi4YVf4Bn8Smics8FoE3BRAUJCSu7my3PvFx2iCqg6p",
"name": "opencensus-go",
"version": "0.0.4"
}
],
"gxVersion": "0.10.0",
"language": "go",
"license": "MIT/BSD",
"name": "go-libp2p-gorpc",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "1.0.18"
"version": "1.0.21"
}

42 changes: 35 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"log"
"reflect"
"sync"
"time"
"unicode"
"unicode/utf8"

Expand All @@ -61,6 +62,9 @@ import (
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"

"github.com/gxed/opencensus-go/stats"
"github.com/gxed/opencensus-go/tag"
)

var logger = logging.Logger("p2p-gorpc")
Expand Down Expand Up @@ -162,6 +166,20 @@ func (server *Server) handle(s *streamWrap) error {
return newServerError(err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctx, err = tag.New(
ctx,
tag.Upsert(ServiceKey, svcID.Name),
tag.Upsert(MethodKey, svcID.Method),
)
if err != nil {
return err
}

ctxv := reflect.ValueOf(ctx)

// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
Expand All @@ -180,11 +198,6 @@ func (server *Server) handle(s *streamWrap) error {

replyv = reflect.New(mtype.ReplyType.Elem())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctxv := reflect.ValueOf(ctx)

// This is a connection watchdog. We do not
// need to read from this stream anymore.
// However we'd like to know if the other side is closed
Expand All @@ -201,13 +214,16 @@ func (server *Server) handle(s *streamWrap) error {
}()

// Call service and respond
return service.svcCall(s, mtype, svcID, ctxv, argv, replyv)
return service.svcCall(ctx, s, mtype, svcID, ctxv, argv, replyv)
}

// svcCall calls the actual method associated
func (s *service) svcCall(sWrap *streamWrap, mtype *methodType, svcID ServiceID, ctxv, argv, replyv reflect.Value) error {
func (s *service) svcCall(ctx context.Context, sWrap *streamWrap, mtype *methodType, svcID ServiceID, ctxv, argv, replyv reflect.Value) error {
function := mtype.method.Func

startT := time.Now()
stats.Record(ctx, RequestCountMetric.M(1))

// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, ctxv, argv, replyv})
// The return value for the method is an error.
Expand All @@ -216,6 +232,8 @@ func (s *service) svcCall(sWrap *streamWrap, mtype *methodType, svcID ServiceID,
if errInter != nil {
errmsg = errInter.(error).Error()
}

stats.Record(ctx, RequestLatencyMetric.M(float64(time.Since(startT))/float64(time.Millisecond))) // TODO(ajl): latency doesn't include response part atm
resp := &Response{svcID, errmsg, nonRPCErr}

return sendResponse(sWrap, resp, replyv.Interface())
Expand Down Expand Up @@ -253,6 +271,11 @@ func (server *Server) Call(call *Call) error {

// Use the context value from the call directly
ctxv := reflect.ValueOf(call.ctx)
ctx, err := tag.New(
call.ctx,
tag.Upsert(ServiceKey, call.SvcID.Name),
tag.Upsert(MethodKey, call.SvcID.Method),
)

// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
Expand Down Expand Up @@ -288,6 +311,10 @@ func (server *Server) Call(call *Call) error {

// Call service and respond
function := mtype.method.Func

startT := time.Now()
stats.Record(ctx, RequestCountMetric.M(1))

// Invoke the method, providing a new value for the reply.
returnValues := function.Call(
[]reflect.Value{
Expand All @@ -306,6 +333,7 @@ func (server *Server) Call(call *Call) error {
if errInter != nil {
return errInter.(error)
}
stats.Record(ctx, RequestLatencyMetric.M(float64(time.Since(startT))/float64(time.Millisecond))) // TODO(ajl): latency doesn't include response part atm
return nil
}

Expand Down

0 comments on commit 128757e

Please sign in to comment.