From b653e36d6e00f650f642ae0c288af9ad83cc468e Mon Sep 17 00:00:00 2001 From: Andy Royle Date: Wed, 21 Nov 2018 15:57:31 +0000 Subject: [PATCH] add metrics for grpc proxy --- docs/content/feature/metrics.md | 7 +- main.go | 23 +++- proxy/grpc_handler.go | 198 ++++++++++++++++++++++++++------ proxy/serve.go | 10 +- 4 files changed, 193 insertions(+), 45 deletions(-) diff --git a/docs/content/feature/metrics.md b/docs/content/feature/metrics.md index dd45e4bf9..8c87b34c0 100644 --- a/docs/content/feature/metrics.md +++ b/docs/content/feature/metrics.md @@ -15,12 +15,16 @@ Fabio reports the following metrics: Name | Type | Description --------------------------- | -------- | ------------- -`{route}.rx` | timer | Number of bytes received by fabion for TCP target +`{route}.rx` | timer | Number of bytes received by fabio for TCP target `{route}.tx` | timer | Number of bytes transmitted by fabio for TCP target `{route}` | timer | Average response time for a route `http.status.code.{code}` | timer | Average response time for all HTTP(S) requests per status code `notfound` | counter | Number of failed HTTP route lookups `requests` | timer | Average response time for all HTTP(S) requests +`grpc.requests` | timer | Average response time for all GRPC(S) requests +`grpc.noroute` | counter | Number of failed GRPC route lookups +`grpc.conn` | counter | Number of established GRPC proxy connections +`grpc.status.{code}` | timer | Average response time for all GRPC(S) requests per status code `tcp.conn` | counter | Number of established TCP proxy connections `tcp.connfail` | counter | Number of TCP upstream connection failures `tcp.noroute` | counter | Number of failed TCP upstream route lookups @@ -29,6 +33,7 @@ Name | Type | Description `tcp_sni.noroute` | counter | Number of failed TCP+SNI upstream route lookups `ws.conn` | gauge | Number of actively open websocket connections + ### Legend #### timer diff --git a/main.go b/main.go index dda475d88..c0fc117d3 100644 --- a/main.go +++ b/main.go @@ -141,8 +141,27 @@ func main() { log.Print("[INFO] Down") } -func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) grpc.StreamHandler { - return grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(cfg, tlscfg)) +func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption { + statsHandler := &proxy.GrpcStatsHandler{ + Connect: metrics.DefaultRegistry.GetCounter("grpc.conn"), + Request: metrics.DefaultRegistry.GetTimer("grpc.requests"), + NoRoute: metrics.DefaultRegistry.GetCounter("grpc.noroute"), + } + + proxyInterceptor := proxy.GrpcProxyInterceptor{ + Config: cfg, + StatsHandler: statsHandler, + } + + handler := grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(tlscfg)) + + return []grpc.ServerOption{ + grpc.CustomCodec(grpc_proxy.Codec()), + grpc.UnknownServiceHandler(handler), + grpc.UnaryInterceptor(proxyInterceptor.Unary), + grpc.StreamInterceptor(proxyInterceptor.Stream), + grpc.StatsHandler(statsHandler), + } } func newHTTPProxy(cfg *config.Config) http.Handler { diff --git a/proxy/grpc_handler.go b/proxy/grpc_handler.go index 675f5a0ab..2f3ee1aa4 100644 --- a/proxy/grpc_handler.go +++ b/proxy/grpc_handler.go @@ -4,41 +4,42 @@ import ( "context" "crypto/tls" "fmt" - "net" - "net/http" - "net/url" - "github.com/fabiolb/fabio/config" + "github.com/fabiolb/fabio/metrics" "github.com/fabiolb/fabio/route" grpc_proxy "github.com/mwitkow/grpc-proxy/proxy" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" + "log" + "net" + "net/http" + "net/url" + "strings" + "time" ) -type GRPCServer struct { +type gRPCServer struct { server *grpc.Server } -func (s *GRPCServer) Close() error { +func (s *gRPCServer) Close() error { s.server.Stop() return nil } -func (s *GRPCServer) Shutdown(ctx context.Context) error { +func (s *gRPCServer) Shutdown(ctx context.Context) error { s.server.GracefulStop() return nil } -func (s *GRPCServer) Serve(lis net.Listener) error { +func (s *gRPCServer) Serve(lis net.Listener) error { return s.server.Serve(lis) } -func GetGRPCDirector(cfg *config.Config, tlscfg *tls.Config) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { - - pick := route.Picker[cfg.Proxy.Strategy] - match := route.Matcher[cfg.Proxy.Matcher] - +func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { md, ok := metadata.FromIncomingContext(ctx) @@ -46,30 +47,11 @@ func GetGRPCDirector(cfg *config.Config, tlscfg *tls.Config) func(ctx context.Co return ctx, nil, fmt.Errorf("error extracting metadata from request") } - reqUrl, err := url.ParseRequestURI(fullMethodName) - - if err != nil { - return ctx, nil, fmt.Errorf("error parsing request url") - } - - headers := http.Header{} - - for k, v := range md { - for _, h := range v { - headers.Add(k, h) - } - } - - req := &http.Request{ - Host: "", - URL: reqUrl, - Header: headers, - } - - target := route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, cfg.GlobMatchingDisabled) + target, _ := ctx.Value(targetKey{}).(*route.Target) if target == nil { - return nil, nil, fmt.Errorf("no route found") + log.Println("[WARN] grpc: no route for ", fullMethodName) + return ctx, nil, fmt.Errorf("no route found") } opts := []grpc.DialOption{ @@ -90,8 +72,154 @@ func GetGRPCDirector(cfg *config.Config, tlscfg *tls.Config) func(ctx context.Co newCtx := context.Background() newCtx = metadata.NewOutgoingContext(newCtx, md) + conn, err := grpc.DialContext(newCtx, target.URL.Host, opts...) return newCtx, conn, err } + +} + +type GrpcProxyInterceptor struct { + Config *config.Config + StatsHandler *GrpcStatsHandler +} + +type targetKey struct{} + +type proxyStream struct { + grpc.ServerStream + ctx context.Context +} + +func (p proxyStream) Context() context.Context { + return p.ctx +} + +func (g GrpcProxyInterceptor) Unary(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + target, err := g.lookup(ctx, info.FullMethod) + + if err != nil { + return nil, err + } + + ctx = context.WithValue(ctx, targetKey{}, target) + + start := time.Now() + + res, err := handler(ctx, req) + + end := time.Now() + dur := end.Sub(start) + + target.Timer.Update(dur) + + return res, err +} + +func (g GrpcProxyInterceptor) Stream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := stream.Context() + + target, err := g.lookup(ctx, info.FullMethod) + + if err != nil { + return err + } + + ctx = context.WithValue(ctx, targetKey{}, target) + + proxyStream := proxyStream{ + ServerStream: stream, + ctx: ctx, + } + + start := time.Now() + + err = handler(srv, proxyStream) + + end := time.Now() + dur := end.Sub(start) + + if target != nil { + target.Timer.Update(dur) + } else { + g.StatsHandler.NoRoute.Inc(1) + } + + return err +} + +func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string) (*route.Target, error) { + pick := route.Picker[g.Config.Proxy.Strategy] + match := route.Matcher[g.Config.Proxy.Matcher] + + md, ok := metadata.FromIncomingContext(ctx) + + if !ok { + return nil, fmt.Errorf("error extracting metadata from request") + } + + reqUrl, err := url.ParseRequestURI(fullMethodName) + + if err != nil { + log.Print("[WARN] Error parsing grpc request url ", fullMethodName) + return nil, fmt.Errorf("error parsing request url") + } + + headers := http.Header{} + + for k, v := range md { + for _, h := range v { + headers.Add(k, h) + } + } + + req := &http.Request{ + Host: "", + URL: reqUrl, + Header: headers, + } + + return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.Config.GlobMatchingDisabled), nil +} + +type GrpcStatsHandler struct { + Connect metrics.Counter + Request metrics.Timer + NoRoute metrics.Counter +} + +type connCtxKey struct{} +type rpcCtxKey struct{} + +func (h *GrpcStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + return context.WithValue(ctx, connCtxKey{}, info) +} + +func (h *GrpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + return context.WithValue(ctx, rpcCtxKey{}, info) +} + +func (h *GrpcStatsHandler) HandleRPC(ctx context.Context, rpc stats.RPCStats) { + rpcStats, _ := rpc.(*stats.End) + + if rpcStats == nil { + return + } + + dur := rpcStats.EndTime.Sub(rpcStats.BeginTime) + + h.Request.Update(dur) + + s, _ := status.FromError(rpcStats.Error) + metrics.DefaultRegistry.GetTimer(fmt.Sprintf("grpc.status.%s", strings.ToLower(s.Code().String()))) +} + +// HandleConn processes the Conn stats. +func (h *GrpcStatsHandler) HandleConn(ctx context.Context, conn stats.ConnStats) { + connBegin, _ := conn.(*stats.ConnBegin) + + if connBegin != nil { + h.Connect.Inc(1) + } } diff --git a/proxy/serve.go b/proxy/serve.go index e115d9937..15b72eeef 100644 --- a/proxy/serve.go +++ b/proxy/serve.go @@ -11,7 +11,6 @@ import ( "github.com/fabiolb/fabio/config" "github.com/fabiolb/fabio/proxy/tcp" - grpc_proxy "github.com/mwitkow/grpc-proxy/proxy" ) type Server interface { @@ -71,17 +70,14 @@ func ListenAndServeHTTP(l config.Listen, h http.Handler, cfg *tls.Config) error return serve(ln, srv) } -func ListenAndServeGRPC(l config.Listen, h grpc.StreamHandler, cfg *tls.Config) error { +func ListenAndServeGRPC(l config.Listen, opts []grpc.ServerOption, cfg *tls.Config) error { ln, err := ListenTCP(l.Addr, cfg) if err != nil { return err } - srv := &GRPCServer{ - server: grpc.NewServer( - grpc.CustomCodec(grpc_proxy.Codec()), - grpc.UnknownServiceHandler(h), - ), + srv := &gRPCServer{ + server: grpc.NewServer(opts...), } return serve(ln, srv)