Skip to content

Commit

Permalink
add metrics for grpc proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
andyroyle committed Nov 22, 2018
1 parent eebb625 commit b653e36
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 45 deletions.
7 changes: 6 additions & 1 deletion docs/content/feature/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ Fabio reports the following metrics:

Name | Type | Description
--------------------------- | -------- | -------------
`{route}.rx` | timer | Number of bytes received by fabion for TCP target
`{route}.rx` | timer | Number of bytes received by fabio for TCP target
`{route}.tx` | timer | Number of bytes transmitted by fabio for TCP target
`{route}` | timer | Average response time for a route
`http.status.code.{code}` | timer | Average response time for all HTTP(S) requests per status code
`notfound` | counter | Number of failed HTTP route lookups
`requests` | timer | Average response time for all HTTP(S) requests
`grpc.requests` | timer | Average response time for all GRPC(S) requests
`grpc.noroute` | counter | Number of failed GRPC route lookups
`grpc.conn` | counter | Number of established GRPC proxy connections
`grpc.status.{code}` | timer | Average response time for all GRPC(S) requests per status code
`tcp.conn` | counter | Number of established TCP proxy connections
`tcp.connfail` | counter | Number of TCP upstream connection failures
`tcp.noroute` | counter | Number of failed TCP upstream route lookups
Expand All @@ -29,6 +33,7 @@ Name | Type | Description
`tcp_sni.noroute` | counter | Number of failed TCP+SNI upstream route lookups
`ws.conn` | gauge | Number of actively open websocket connections


### Legend

#### timer
Expand Down
23 changes: 21 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,27 @@ func main() {
log.Print("[INFO] Down")
}

func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) grpc.StreamHandler {
return grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(cfg, tlscfg))
func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption {
statsHandler := &proxy.GrpcStatsHandler{
Connect: metrics.DefaultRegistry.GetCounter("grpc.conn"),
Request: metrics.DefaultRegistry.GetTimer("grpc.requests"),
NoRoute: metrics.DefaultRegistry.GetCounter("grpc.noroute"),
}

proxyInterceptor := proxy.GrpcProxyInterceptor{
Config: cfg,
StatsHandler: statsHandler,
}

handler := grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(tlscfg))

return []grpc.ServerOption{
grpc.CustomCodec(grpc_proxy.Codec()),
grpc.UnknownServiceHandler(handler),
grpc.UnaryInterceptor(proxyInterceptor.Unary),
grpc.StreamInterceptor(proxyInterceptor.Stream),
grpc.StatsHandler(statsHandler),
}
}

func newHTTPProxy(cfg *config.Config) http.Handler {
Expand Down
198 changes: 163 additions & 35 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,72 +4,54 @@ import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"

"github.com/fabiolb/fabio/config"
"github.com/fabiolb/fabio/metrics"
"github.com/fabiolb/fabio/route"
grpc_proxy "github.com/mwitkow/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"log"
"net"
"net/http"
"net/url"
"strings"
"time"
)

type GRPCServer struct {
type gRPCServer struct {
server *grpc.Server
}

func (s *GRPCServer) Close() error {
func (s *gRPCServer) Close() error {
s.server.Stop()
return nil
}

func (s *GRPCServer) Shutdown(ctx context.Context) error {
func (s *gRPCServer) Shutdown(ctx context.Context) error {
s.server.GracefulStop()
return nil
}

func (s *GRPCServer) Serve(lis net.Listener) error {
func (s *gRPCServer) Serve(lis net.Listener) error {
return s.server.Serve(lis)
}

func GetGRPCDirector(cfg *config.Config, tlscfg *tls.Config) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {

pick := route.Picker[cfg.Proxy.Strategy]
match := route.Matcher[cfg.Proxy.Matcher]

func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx)

if !ok {
return ctx, nil, fmt.Errorf("error extracting metadata from request")
}

reqUrl, err := url.ParseRequestURI(fullMethodName)

if err != nil {
return ctx, nil, fmt.Errorf("error parsing request url")
}

headers := http.Header{}

for k, v := range md {
for _, h := range v {
headers.Add(k, h)
}
}

req := &http.Request{
Host: "",
URL: reqUrl,
Header: headers,
}

target := route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, cfg.GlobMatchingDisabled)
target, _ := ctx.Value(targetKey{}).(*route.Target)

if target == nil {
return nil, nil, fmt.Errorf("no route found")
log.Println("[WARN] grpc: no route for ", fullMethodName)
return ctx, nil, fmt.Errorf("no route found")
}

opts := []grpc.DialOption{
Expand All @@ -90,8 +72,154 @@ func GetGRPCDirector(cfg *config.Config, tlscfg *tls.Config) func(ctx context.Co

newCtx := context.Background()
newCtx = metadata.NewOutgoingContext(newCtx, md)

conn, err := grpc.DialContext(newCtx, target.URL.Host, opts...)

return newCtx, conn, err
}

}

type GrpcProxyInterceptor struct {
Config *config.Config
StatsHandler *GrpcStatsHandler
}

type targetKey struct{}

type proxyStream struct {
grpc.ServerStream
ctx context.Context
}

func (p proxyStream) Context() context.Context {
return p.ctx
}

func (g GrpcProxyInterceptor) Unary(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
target, err := g.lookup(ctx, info.FullMethod)

if err != nil {
return nil, err
}

ctx = context.WithValue(ctx, targetKey{}, target)

start := time.Now()

res, err := handler(ctx, req)

end := time.Now()
dur := end.Sub(start)

target.Timer.Update(dur)

return res, err
}

func (g GrpcProxyInterceptor) Stream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()

target, err := g.lookup(ctx, info.FullMethod)

if err != nil {
return err
}

ctx = context.WithValue(ctx, targetKey{}, target)

proxyStream := proxyStream{
ServerStream: stream,
ctx: ctx,
}

start := time.Now()

err = handler(srv, proxyStream)

end := time.Now()
dur := end.Sub(start)

if target != nil {
target.Timer.Update(dur)
} else {
g.StatsHandler.NoRoute.Inc(1)
}

return err
}

func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string) (*route.Target, error) {
pick := route.Picker[g.Config.Proxy.Strategy]
match := route.Matcher[g.Config.Proxy.Matcher]

md, ok := metadata.FromIncomingContext(ctx)

if !ok {
return nil, fmt.Errorf("error extracting metadata from request")
}

reqUrl, err := url.ParseRequestURI(fullMethodName)

if err != nil {
log.Print("[WARN] Error parsing grpc request url ", fullMethodName)
return nil, fmt.Errorf("error parsing request url")
}

headers := http.Header{}

for k, v := range md {
for _, h := range v {
headers.Add(k, h)
}
}

req := &http.Request{
Host: "",
URL: reqUrl,
Header: headers,
}

return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.Config.GlobMatchingDisabled), nil
}

type GrpcStatsHandler struct {
Connect metrics.Counter
Request metrics.Timer
NoRoute metrics.Counter
}

type connCtxKey struct{}
type rpcCtxKey struct{}

func (h *GrpcStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, connCtxKey{}, info)
}

func (h *GrpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, rpcCtxKey{}, info)
}

func (h *GrpcStatsHandler) HandleRPC(ctx context.Context, rpc stats.RPCStats) {
rpcStats, _ := rpc.(*stats.End)

if rpcStats == nil {
return
}

dur := rpcStats.EndTime.Sub(rpcStats.BeginTime)

h.Request.Update(dur)

s, _ := status.FromError(rpcStats.Error)
metrics.DefaultRegistry.GetTimer(fmt.Sprintf("grpc.status.%s", strings.ToLower(s.Code().String())))
}

// HandleConn processes the Conn stats.
func (h *GrpcStatsHandler) HandleConn(ctx context.Context, conn stats.ConnStats) {
connBegin, _ := conn.(*stats.ConnBegin)

if connBegin != nil {
h.Connect.Inc(1)
}
}
10 changes: 3 additions & 7 deletions proxy/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/fabiolb/fabio/config"
"github.com/fabiolb/fabio/proxy/tcp"
grpc_proxy "github.com/mwitkow/grpc-proxy/proxy"
)

type Server interface {
Expand Down Expand Up @@ -71,17 +70,14 @@ func ListenAndServeHTTP(l config.Listen, h http.Handler, cfg *tls.Config) error
return serve(ln, srv)
}

func ListenAndServeGRPC(l config.Listen, h grpc.StreamHandler, cfg *tls.Config) error {
func ListenAndServeGRPC(l config.Listen, opts []grpc.ServerOption, cfg *tls.Config) error {
ln, err := ListenTCP(l.Addr, cfg)
if err != nil {
return err
}

srv := &GRPCServer{
server: grpc.NewServer(
grpc.CustomCodec(grpc_proxy.Codec()),
grpc.UnknownServiceHandler(h),
),
srv := &gRPCServer{
server: grpc.NewServer(opts...),
}

return serve(ln, srv)
Expand Down

0 comments on commit b653e36

Please sign in to comment.