diff --git a/core/comm/config.go b/core/comm/config.go index 1340a078ddd..74098a01f61 100644 --- a/core/comm/config.go +++ b/core/comm/config.go @@ -11,6 +11,7 @@ import ( "crypto/x509" "time" + "github.com/go-kit/kit/metrics" "github.com/hyperledger/fabric/common/flogging" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -59,6 +60,8 @@ type ServerConfig struct { UnaryInterceptors []grpc.UnaryServerInterceptor // Logger specifies the logger the server will use Logger *flogging.FabricLogger + // Metrics specifies the different metrics that are tracked for the server + Metrics *Metrics } // ClientConfig defines the parameters for configuring a GRPCClient instance @@ -117,6 +120,13 @@ type KeepaliveOptions struct { ServerMinInterval time.Duration } +type Metrics struct { + // OpenConnCounter keeps track of number of open connections + OpenConnCounter metrics.Counter + // ClosedConnCounter keeps track of number connections closed + ClosedConnCounter metrics.Counter +} + // ServerKeepaliveOptions returns gRPC keepalive options for server. If // opts is nil, the default keepalive options are returned func ServerKeepaliveOptions(ka *KeepaliveOptions) []grpc.ServerOption { diff --git a/core/comm/server.go b/core/comm/server.go index 679193ae46a..b8c8f0d3634 100644 --- a/core/comm/server.go +++ b/core/comm/server.go @@ -148,6 +148,14 @@ func NewGRPCServerFromListener(listener net.Listener, serverConfig ServerConfig) ) } + if serverConfig.Metrics != nil { + sh := &ServerStatsHandler{ + OpenConnCounter: serverConfig.Metrics.OpenConnCounter, + ClosedConnCounter: serverConfig.Metrics.ClosedConnCounter, + } + serverOpts = append(serverOpts, grpc.StatsHandler(sh)) + } + grpcServer.server = grpc.NewServer(serverOpts...) return grpcServer, nil diff --git a/core/comm/serverstatshandler.go b/core/comm/serverstatshandler.go index 10ccf7be4f1..6318853674a 100644 --- a/core/comm/serverstatshandler.go +++ b/core/comm/serverstatshandler.go @@ -22,17 +22,17 @@ func (h *ServerStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) return ctx } -func (h *ServerStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) { - switch s.(type) { - case *stats.Begin: - h.OpenConnCounter.Add(1) - case *stats.End: - h.ClosedConnCounter.Add(1) - } -} +func (h *ServerStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {} func (h *ServerStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { return ctx } -func (h *ServerStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {} +func (h *ServerStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) { + switch s.(type) { + case *stats.ConnBegin: + h.OpenConnCounter.Add(1) + case *stats.ConnEnd: + h.ClosedConnCounter.Add(1) + } +} diff --git a/core/comm/serverstatshandler_test.go b/core/comm/serverstatshandler_test.go index d88922ecc51..3c9ae8cf681 100644 --- a/core/comm/serverstatshandler_test.go +++ b/core/comm/serverstatshandler_test.go @@ -8,15 +8,22 @@ package comm_test import ( "context" + "net" "testing" + "time" "github.com/hyperledger/fabric/common/metrics/metricsfakes" "github.com/hyperledger/fabric/core/comm" - "github.com/stretchr/testify/assert" + testpb "github.com/hyperledger/fabric/core/comm/testdata/grpc" + . "github.com/onsi/gomega" + "google.golang.org/grpc" "google.golang.org/grpc/stats" ) func TestConnectionCounters(t *testing.T) { + t.Parallel() + gt := NewGomegaWithT(t) + openConn := &metricsfakes.Counter{} closedConn := &metricsfakes.Counter{} sh := &comm.ServerStatsHandler{ @@ -25,13 +32,68 @@ func TestConnectionCounters(t *testing.T) { } for i := 1; i <= 10; i++ { - sh.HandleRPC(context.Background(), &stats.Begin{}) + sh.HandleConn(context.Background(), &stats.ConnBegin{}) + gt.Expect(openConn.AddCallCount()).To(Equal(i)) } - assert.Equal(t, 10, openConn.AddCallCount()) for i := 1; i <= 5; i++ { - sh.HandleRPC(context.Background(), &stats.End{}) + sh.HandleConn(context.Background(), &stats.ConnEnd{}) + gt.Expect(closedConn.AddCallCount()).To(Equal(i)) } - assert.Equal(t, 5, closedConn.AddCallCount()) +} + +func TestConnMetricsGRPCServer(t *testing.T) { + t.Parallel() + gt := NewGomegaWithT(t) + + openConn := &metricsfakes.Counter{} + closedConn := &metricsfakes.Counter{} + + listener, err := net.Listen("tcp", "localhost:0") + gt.Expect(err).NotTo(HaveOccurred()) + srv, err := comm.NewGRPCServerFromListener( + listener, + comm.ServerConfig{ + SecOpts: &comm.SecureOptions{UseTLS: false}, + Metrics: &comm.Metrics{ + OpenConnCounter: openConn, + ClosedConnCounter: closedConn, + }, + }, + ) + gt.Expect(err).NotTo(HaveOccurred()) + + // register the GRPC test server + testpb.RegisterEmptyServiceServer(srv.Server(), &emptyServiceServer{}) + // start the server + go srv.Start() + defer srv.Stop() + + // test grpc connection counts + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + gt.Expect(openConn.AddCallCount()).To(Equal(0)) + gt.Expect(closedConn.AddCallCount()).To(Equal(0)) + + //create GRPC client conn + var clientConns []*grpc.ClientConn + for i := 1; i <= 3; i++ { + clientConn, err := grpc.DialContext(ctx, listener.Addr().String(), grpc.WithInsecure()) + gt.Expect(err).NotTo(HaveOccurred()) + clientConns = append(clientConns, clientConn) + + //invoke service + client := testpb.NewEmptyServiceClient(clientConn) + _, err = client.EmptyCall(context.Background(), &testpb.Empty{}) + gt.Expect(err).NotTo(HaveOccurred()) + gt.Expect(openConn.AddCallCount()).To(Equal(i)) + } + + for i, conn := range clientConns { + gt.Expect(closedConn.AddCallCount()).Should(Equal(i)) + conn.Close() + gt.Eventually(closedConn.AddCallCount, time.Second).Should(Equal(i + 1)) + } }