Skip to content

Commit

Permalink
[FAB-12916] gossip state metrics
Browse files Browse the repository at this point in the history
This change set adds reporting for the following events:

1) Gossip's height is incremented - gauge metric
2) A block commit is made - histogram metric with elapsed time,
   with a label of the sequence of the block.
3) A block is pushed or popped from the payload buffer - gauge metric
   with the block sequence as the value, and also a gauge metric
   of the payload buffer size at time of push/pop.

Change-Id: I542799097aa89cef32b81affa4bbd32f50b39ef7
Signed-off-by: Hagar Meir <hagar.meir@ibm.com>
  • Loading branch information
HagarMeir committed Jan 22, 2019
1 parent c31d94e commit c61266c
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 24 deletions.
3 changes: 1 addition & 2 deletions core/peer/peer_test.go
Expand Up @@ -118,8 +118,7 @@ func TestCreateChainFromBlock(t *testing.T) {
return dialOpts
}
err = service.InitGossipServiceCustomDeliveryFactory(
identity, socket.Addr().String(), grpcServer, nil,
&mockDeliveryClientFactory{},
identity, &disabled.Provider{}, socket.Addr().String(), grpcServer, nil, &mockDeliveryClientFactory{},
messageCryptoService, secAdv, defaultSecureDialOpts)

assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion core/scc/cscc/configure_test.go
Expand Up @@ -266,7 +266,8 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
identity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
err := service.InitGossipServiceCustomDeliveryFactory(identity, peerEndpoint, nil, nil, &mockDeliveryClientFactory{}, messageCryptoService, secAdv, nil)
err := service.InitGossipServiceCustomDeliveryFactory(identity, &disabled.Provider{}, peerEndpoint, nil, nil,
&mockDeliveryClientFactory{}, messageCryptoService, secAdv, nil)
assert.NoError(t, err)

// Successful path for JoinChain
Expand Down
12 changes: 12 additions & 0 deletions docs/source/metrics_reference.rst
Expand Up @@ -95,6 +95,12 @@ The following metrics are currently exported for consumption by Prometheus.
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
| fabric_version | gauge | The active version of Fabric. | version |
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
| gossip_payload_buffer_size | gauge | Size of the payload buffer | channel |
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
| gossip_state_commit_duration | histogram | Time it takes to commit a block in seconds | channel |
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
| gossip_state_height | gauge | Current ledger height | channel |
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
| grpc_comm_conn_closed | counter | gRPC connections closed. Open minus closed is the active | |
| | | number of connections. | |
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
Expand Down Expand Up @@ -224,6 +230,12 @@ associated with the metric.
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
| fabric_version.%{version} | gauge | The active version of Fabric. |
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
| gossip.payload_buffer.size.%{channel} | gauge | Size of the payload buffer |
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
| gossip.state.commit_duration.%{channel} | histogram | Time it takes to commit a block in seconds |
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
| gossip.state.height.%{channel} | gauge | Current ledger height |
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
| grpc.comm.conn_closed | counter | gRPC connections closed. Open minus closed is the active |
| | | number of connections. |
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
Expand Down
64 changes: 64 additions & 0 deletions gossip/metrics/metrics.go
@@ -0,0 +1,64 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package metrics

import "github.com/hyperledger/fabric/common/metrics"

// GossipMetrics encapsulates all of gossip metrics
type GossipMetrics struct {
StateMetrics *StateMetrics
}

func NewGossipMetrics(p metrics.Provider) *GossipMetrics {
return &GossipMetrics{
StateMetrics: newStateMetrics(p),
}
}

// StateMetrics encapsulates gossip state related metrics
type StateMetrics struct {
Height metrics.Gauge
CommitDuration metrics.Histogram
PayloadBufferSize metrics.Gauge
}

func newStateMetrics(p metrics.Provider) *StateMetrics {
return &StateMetrics{
Height: p.NewGauge(HeightOpts),
CommitDuration: p.NewHistogram(CommitDurationOpts),
PayloadBufferSize: p.NewGauge(PayloadBufferSizeOpts),
}
}

var (
HeightOpts = metrics.GaugeOpts{
Namespace: "gossip",
Subsystem: "state",
Name: "height",
Help: "Current ledger height",
LabelNames: []string{"channel"},
StatsdFormat: "%{#fqname}.%{channel}",
}

CommitDurationOpts = metrics.HistogramOpts{
Namespace: "gossip",
Subsystem: "state",
Name: "commit_duration",
Help: "Time it takes to commit a block in seconds",
LabelNames: []string{"channel"},
StatsdFormat: "%{#fqname}.%{channel}",
}

PayloadBufferSizeOpts = metrics.GaugeOpts{
Namespace: "gossip",
Subsystem: "payload_buffer",
Name: "size",
Help: "Size of the payload buffer",
LabelNames: []string{"channel"},
StatsdFormat: "%{#fqname}.%{channel}",
}
)
37 changes: 37 additions & 0 deletions gossip/metrics/metrics_test.go
@@ -0,0 +1,37 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package metrics

import (
"testing"

"github.com/hyperledger/fabric/common/metrics/metricsfakes"
"github.com/stretchr/testify/assert"
)

func TestMetrics(t *testing.T) {

provider := &metricsfakes.Provider{}

counter := &metricsfakes.Counter{}
gauge := &metricsfakes.Gauge{}
histogram := &metricsfakes.Histogram{}

provider.NewCounterReturns(counter)
provider.NewGaugeReturns(gauge)
provider.NewHistogramReturns(histogram)

gossipMetrics := NewGossipMetrics(provider)

// make sure all metrics were created
assert.NotNil(t, gossipMetrics)
assert.NotNil(t, gossipMetrics.StateMetrics)
assert.NotNil(t, gossipMetrics.StateMetrics.Height)
assert.NotNil(t, gossipMetrics.StateMetrics.CommitDuration)
assert.NotNil(t, gossipMetrics.StateMetrics.PayloadBufferSize)

}
17 changes: 11 additions & 6 deletions gossip/service/gossip_service.go
Expand Up @@ -9,6 +9,7 @@ package service
import (
"sync"

"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/common/privdata"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/integration"
gossipMetrics "github.com/hyperledger/fabric/gossip/metrics"
privdata2 "github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/util"
Expand Down Expand Up @@ -95,6 +97,7 @@ type gossipServiceImpl struct {
mcs api.MessageCryptoService
peerIdentity []byte
secAdv api.SecurityAdvisor
metrics *gossipMetrics.GossipMetrics
}

// This is an implementation of api.JoinChannelMessage.
Expand Down Expand Up @@ -126,20 +129,21 @@ func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.Anch
var logger = util.GetLogger(util.ServiceLogger, "")

// InitGossipService initialize gossip service
func InitGossipService(peerIdentity []byte, endpoint string, s *grpc.Server, certs *gossipCommon.TLSCertificates,
mcs api.MessageCryptoService, secAdv api.SecurityAdvisor, secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error {
func InitGossipService(peerIdentity []byte, metricsProvider metrics.Provider, endpoint string, s *grpc.Server,
certs *gossipCommon.TLSCertificates, mcs api.MessageCryptoService, secAdv api.SecurityAdvisor,
secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error {
// TODO: Remove this.
// TODO: This is a temporary work-around to make the gossip leader election module load its logger at startup
// TODO: in order for the flogging package to register this logger in time so it can set the log levels as requested in the config
util.GetLogger(util.ElectionLogger, "")
return InitGossipServiceCustomDeliveryFactory(peerIdentity, endpoint, s, certs, &deliveryFactoryImpl{},
return InitGossipServiceCustomDeliveryFactory(peerIdentity, metricsProvider, endpoint, s, certs, &deliveryFactoryImpl{},
mcs, secAdv, secureDialOpts, bootPeers...)
}

// InitGossipServiceCustomDeliveryFactory initialize gossip service with customize delivery factory
// implementation, might be useful for testing and mocking purposes
func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string, s *grpc.Server,
certs *gossipCommon.TLSCertificates, factory DeliveryServiceFactory, mcs api.MessageCryptoService,
func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, metricsProvider metrics.Provider, endpoint string,
s *grpc.Server, certs *gossipCommon.TLSCertificates, factory DeliveryServiceFactory, mcs api.MessageCryptoService,
secAdv api.SecurityAdvisor, secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error {
var err error
var gossip gossip.Gossip
Expand All @@ -162,6 +166,7 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string
deliveryFactory: factory,
peerIdentity: peerIdentity,
secAdv: secAdv,
metrics: gossipMetrics.NewGossipMetrics(metricsProvider),
}
})
return errors.WithStack(err)
Expand Down Expand Up @@ -262,7 +267,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string
}
g.privateHandlers[chainID].reconciler.Start()

g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator)
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator, g.metrics.StateMetrics)
if g.deliveryService[chainID] == nil {
var err error
g.deliveryService[chainID], err = g.deliveryFactory.Service(g, endpoints, g.mcs)
Expand Down
11 changes: 7 additions & 4 deletions gossip/service/gossip_service_test.go
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/core/ledger"
Expand All @@ -24,6 +25,7 @@ import (
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/gossip"
gossipMetrics "github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/msp/mgmt"
Expand Down Expand Up @@ -81,8 +83,8 @@ func TestInitGossipService(t *testing.T) {
defer wg.Done()
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
err := InitGossipService(identity, "localhost:5611", grpcServer, nil, messageCryptoService,
secAdv, nil)
err := InitGossipService(identity, &disabled.Provider{}, "localhost:5611", grpcServer, nil,
messageCryptoService, secAdv, nil)
assert.NoError(t, err)
}()
}
Expand Down Expand Up @@ -681,6 +683,7 @@ func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gos
deliveryService: make(map[string]deliverclient.DeliverService),
deliveryFactory: &deliveryFactoryImpl{},
peerIdentity: api.PeerIdentityType(conf.InternalEndpoint),
metrics: gossipMetrics.NewGossipMetrics(&disabled.Provider{}),
}

return gossipService
Expand Down Expand Up @@ -766,7 +769,7 @@ func TestInvalidInitialization(t *testing.T) {
defer grpcServer.Stop()

secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
err := InitGossipService(api.PeerIdentityType("IDENTITY"), "localhost:7611", grpcServer, nil,
err := InitGossipService(api.PeerIdentityType("IDENTITY"), &disabled.Provider{}, "localhost:7611", grpcServer, nil,
&naiveCryptoService{}, secAdv, nil)
assert.NoError(t, err)
gService := GetGossipService().(*gossipServiceImpl)
Expand All @@ -791,7 +794,7 @@ func TestChannelConfig(t *testing.T) {
defer grpcServer.Stop()

secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
error = InitGossipService(api.PeerIdentityType("IDENTITY"), "localhost:6611", grpcServer, nil,
error = InitGossipService(api.PeerIdentityType("IDENTITY"), &disabled.Provider{}, "localhost:6611", grpcServer, nil,
&naiveCryptoService{}, secAdv, nil)
assert.NoError(t, error)
gService := GetGossipService().(*gossipServiceImpl)
Expand Down

0 comments on commit c61266c

Please sign in to comment.