Skip to content

Commit 2ee1084

Browse files
committed
[FAB-12915] gossip leader election metrics
This change set introduces metrics for gossip leader election. - Leader declaration event - gauge, 0 or 1 according to whether the peer is the leader or not Change-Id: I14e3ffbc77c425fbf9a5575e1a07802eda586dda Signed-off-by: Hagar Meir <hagar.meir@ibm.com> Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent f7268d8 commit 2ee1084

File tree

10 files changed

+164
-16
lines changed

10 files changed

+164
-16
lines changed

docs/source/metrics_reference.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ The following metrics are currently exported for consumption by Prometheus.
119119
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
120120
| fabric_version | gauge | The active version of Fabric. | version |
121121
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
122+
| gossip_leader_election_leader | gauge | Peer is leader (1) or follower (0) | channel |
123+
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
122124
| gossip_payload_buffer_size | gauge | Size of the payload buffer | channel |
123125
+-----------------------------------------------------+-----------+------------------------------------------------------------+--------------------+
124126
| gossip_state_commit_duration | histogram | Time it takes to commit a block in seconds | channel |
@@ -278,6 +280,8 @@ associated with the metric.
278280
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
279281
| fabric_version.%{version} | gauge | The active version of Fabric. |
280282
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
283+
| gossip.leader_election.leader.%{channel} | gauge | Peer is leader (1) or follower (0) |
284+
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
281285
| gossip.payload_buffer.size.%{channel} | gauge | Size of the payload buffer |
282286
+-----------------------------------------------------------------------------------------+-----------+------------------------------------------------------------+
283287
| gossip.state.commit_duration.%{channel} | histogram | Time it takes to commit a block in seconds |

gossip/election/adapter.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/hyperledger/fabric/gossip/common"
1515
"github.com/hyperledger/fabric/gossip/discovery"
16+
"github.com/hyperledger/fabric/gossip/metrics"
1617
"github.com/hyperledger/fabric/gossip/util"
1718
proto "github.com/hyperledger/fabric/protos/gossip"
1819
)
@@ -68,10 +69,12 @@ type adapterImpl struct {
6869

6970
doneCh chan struct{}
7071
stopOnce *sync.Once
72+
metrics *metrics.ElectionMetrics
7173
}
7274

7375
// NewAdapter creates new leader election adapter
74-
func NewAdapter(gossip gossip, pkiid common.PKIidType, channel common.ChainID) LeaderElectionAdapter {
76+
func NewAdapter(gossip gossip, pkiid common.PKIidType, channel common.ChainID,
77+
metrics *metrics.ElectionMetrics) LeaderElectionAdapter {
7578
return &adapterImpl{
7679
gossip: gossip,
7780
selfPKIid: pkiid,
@@ -85,6 +88,7 @@ func NewAdapter(gossip gossip, pkiid common.PKIidType, channel common.ChainID) L
8588

8689
doneCh: make(chan struct{}),
8790
stopOnce: &sync.Once{},
91+
metrics: metrics,
8892
}
8993
}
9094

@@ -152,6 +156,14 @@ func (ai *adapterImpl) Peers() []Peer {
152156
return res
153157
}
154158

159+
func (ai *adapterImpl) ReportMetrics(isLeader bool) {
160+
var leadershipBit float64
161+
if isLeader {
162+
leadershipBit = 1
163+
}
164+
ai.metrics.Declaration.With("channel", string(ai.channel)).Set(leadershipBit)
165+
}
166+
155167
func (ai *adapterImpl) Stop() {
156168
stopFunc := func() {
157169
close(ai.doneCh)

gossip/election/adapter_test.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@ import (
1414
"testing"
1515
"time"
1616

17+
"github.com/hyperledger/fabric/common/metrics"
18+
"github.com/hyperledger/fabric/common/metrics/disabled"
19+
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
1720
"github.com/hyperledger/fabric/gossip/common"
1821
"github.com/hyperledger/fabric/gossip/discovery"
22+
gossipMetrics "github.com/hyperledger/fabric/gossip/metrics"
1923
"github.com/hyperledger/fabric/gossip/util"
2024
proto "github.com/hyperledger/fabric/protos/gossip"
25+
"github.com/stretchr/testify/assert"
2126
)
2227

2328
func init() {
@@ -35,7 +40,8 @@ func TestNewAdapter(t *testing.T) {
3540
peersCluster := newClusterOfPeers("0")
3641
peersCluster.addPeer("peer0", mockGossip)
3742

38-
NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"))
43+
NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"),
44+
gossipMetrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics)
3945
}
4046

4147
func TestAdapterImpl_CreateMessage(t *testing.T) {
@@ -46,7 +52,8 @@ func TestAdapterImpl_CreateMessage(t *testing.T) {
4652
}
4753
mockGossip := newGossip("peer0", selfNetworkMember)
4854

49-
adapter := NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"))
55+
adapter := NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0"),
56+
gossipMetrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics)
5057
msg := adapter.CreateMessage(true)
5158

5259
if !msg.(*msgImpl).msg.IsLeadershipMsg() {
@@ -281,10 +288,78 @@ func createCluster(peers ...int) (*clusterOfPeers, map[string]*adapterImpl) {
281288
}
282289

283290
mockGossip := newGossip(peerEndpoint, peerMember)
284-
adapter := NewAdapter(mockGossip, peerMember.PKIid, []byte("channel0"))
291+
adapter := NewAdapter(mockGossip, peerMember.PKIid, []byte("channel0"),
292+
gossipMetrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics)
285293
adapters[peerEndpoint] = adapter.(*adapterImpl)
286294
cluster.addPeer(peerEndpoint, mockGossip)
287295
}
288296

289297
return cluster, adapters
290298
}
299+
300+
type testMetricProvider struct {
301+
fakeProvider *metricsfakes.Provider
302+
fakeDeclarationGauge *metricsfakes.Gauge
303+
}
304+
305+
func testUtilConstructMetricProvider() *testMetricProvider {
306+
fakeProvider := &metricsfakes.Provider{}
307+
fakeDeclarationGauge := testUtilConstructGauge()
308+
309+
fakeProvider.NewCounterStub = func(opts metrics.CounterOpts) metrics.Counter {
310+
return nil
311+
}
312+
fakeProvider.NewHistogramStub = func(opts metrics.HistogramOpts) metrics.Histogram {
313+
return nil
314+
}
315+
fakeProvider.NewGaugeStub = func(opts metrics.GaugeOpts) metrics.Gauge {
316+
switch opts.Name {
317+
case gossipMetrics.LeaderDeclerationOpts.Name:
318+
return fakeDeclarationGauge
319+
}
320+
return nil
321+
}
322+
323+
return &testMetricProvider{
324+
fakeProvider,
325+
fakeDeclarationGauge,
326+
}
327+
}
328+
329+
func testUtilConstructGauge() *metricsfakes.Gauge {
330+
fakeGauge := &metricsfakes.Gauge{}
331+
fakeGauge.WithReturns(fakeGauge)
332+
return fakeGauge
333+
}
334+
335+
func TestReportMetrics(t *testing.T) {
336+
337+
testMetricProvider := testUtilConstructMetricProvider()
338+
electionMetrics := gossipMetrics.NewGossipMetrics(testMetricProvider.fakeProvider).ElectionMetrics
339+
340+
mockGossip := newGossip("", &discovery.NetworkMember{})
341+
adapter := NewAdapter(mockGossip, nil, []byte("channel0"), electionMetrics)
342+
343+
adapter.ReportMetrics(true)
344+
345+
assert.Equal(t,
346+
[]string{"channel", "channel0"},
347+
testMetricProvider.fakeDeclarationGauge.WithArgsForCall(0),
348+
)
349+
assert.EqualValues(t,
350+
1,
351+
testMetricProvider.fakeDeclarationGauge.SetArgsForCall(0),
352+
)
353+
354+
adapter.ReportMetrics(false)
355+
356+
assert.Equal(t,
357+
[]string{"channel", "channel0"},
358+
testMetricProvider.fakeDeclarationGauge.WithArgsForCall(1),
359+
)
360+
assert.EqualValues(t,
361+
0,
362+
testMetricProvider.fakeDeclarationGauge.SetArgsForCall(1),
363+
)
364+
365+
}

gossip/election/election.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ type LeaderElectionAdapter interface {
8282

8383
// Peers returns a list of peers considered alive
8484
Peers() []Peer
85+
86+
// ReportMetrics sends a report to the metrics server about a leadership status
87+
ReportMetrics(isLeader bool)
8588
}
8689

8790
type leadershipCallback func(isLeader bool)
@@ -319,6 +322,7 @@ func (le *leaderElectionSvcImpl) follower() {
319322

320323
le.proposals.Clear()
321324
atomic.StoreInt32(&le.leaderExists, int32(0))
325+
le.adapter.ReportMetrics(false)
322326
select {
323327
case <-time.After(getLeaderAliveThreshold()):
324328
case <-le.stopChan:
@@ -329,6 +333,7 @@ func (le *leaderElectionSvcImpl) follower() {
329333
func (le *leaderElectionSvcImpl) leader() {
330334
leaderDeclaration := le.adapter.CreateMessage(true)
331335
le.adapter.Gossip(leaderDeclaration)
336+
le.adapter.ReportMetrics(true)
332337
le.waitForInterrupt(getLeadershipDeclarationInterval())
333338
}
334339

gossip/election/election_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ func (p *peer) Peers() []Peer {
123123
return peers
124124
}
125125

126+
func (p *peer) ReportMetrics(isLeader bool) {
127+
p.Mock.Called(isLeader)
128+
}
129+
126130
func (p *peer) leaderCallback(isLeader bool) {
127131
p.lock.Lock()
128132
defer p.lock.Unlock()
@@ -160,6 +164,7 @@ func createPeer(id int, peerMap map[string]*peer, l *sync.RWMutex) *peer {
160164
idStr := fmt.Sprintf("p%d", id)
161165
c := make(chan Msg, 100)
162166
p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{}), leaderFromCallback: false, callbackInvoked: false}
167+
p.On("ReportMetrics", mock.Anything, mock.Anything)
163168
p.LeaderElectionService = NewLeaderElectionService(p, idStr, p.leaderCallback)
164169
l.Lock()
165170
peerMap[idStr] = p
@@ -190,6 +195,21 @@ func waitForLeaderElection(t *testing.T, peers []*peer) []string {
190195
return waitForMultipleLeadersElection(t, peers, 1)
191196
}
192197

198+
func TestMetrics(t *testing.T) {
199+
t.Parallel()
200+
// Scenario: spawn a single peer and ensure it reports being a leader after some time.
201+
// Then, make it relinquish its leadership and then ensure it reports not being a leader.
202+
p := createPeer(0, make(map[string]*peer), &sync.RWMutex{})
203+
waitForLeaderElection(t, []*peer{p})
204+
// Ensure we sent a leadership declaration during the time of leadership acquisition.
205+
p.AssertCalled(t, "ReportMetrics", true)
206+
p.Yield()
207+
assert.False(t, p.IsLeader())
208+
waitForLeaderElection(t, []*peer{p})
209+
// Ensure declaration for not being a leader was sent
210+
p.AssertCalled(t, "ReportMetrics", false)
211+
}
212+
193213
func TestInitPeersAtSameTime(t *testing.T) {
194214
t.Parallel()
195215
// Scenario: Peers are spawned at the same time

gossip/metrics/metrics.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ import "github.com/hyperledger/fabric/common/metrics"
1010

1111
// GossipMetrics encapsulates all of gossip metrics
1212
type GossipMetrics struct {
13-
StateMetrics *StateMetrics
13+
StateMetrics *StateMetrics
14+
ElectionMetrics *ElectionMetrics
1415
}
1516

1617
func NewGossipMetrics(p metrics.Provider) *GossipMetrics {
1718
return &GossipMetrics{
18-
StateMetrics: newStateMetrics(p),
19+
StateMetrics: newStateMetrics(p),
20+
ElectionMetrics: newElectionMetrics(p),
1921
}
2022
}
2123

@@ -62,3 +64,25 @@ var (
6264
StatsdFormat: "%{#fqname}.%{channel}",
6365
}
6466
)
67+
68+
// ElectionMetrics encapsulates gossip leader election related metrics
69+
type ElectionMetrics struct {
70+
Declaration metrics.Gauge
71+
}
72+
73+
func newElectionMetrics(p metrics.Provider) *ElectionMetrics {
74+
return &ElectionMetrics{
75+
Declaration: p.NewGauge(LeaderDeclerationOpts),
76+
}
77+
}
78+
79+
var (
80+
LeaderDeclerationOpts = metrics.GaugeOpts{
81+
Namespace: "gossip",
82+
Subsystem: "leader_election",
83+
Name: "leader",
84+
Help: "Peer is leader (1) or follower (0)",
85+
LabelNames: []string{"channel"},
86+
StatsdFormat: "%{#fqname}.%{channel}",
87+
}
88+
)

gossip/metrics/metrics_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@ func TestMetrics(t *testing.T) {
2929

3030
// make sure all metrics were created
3131
assert.NotNil(t, gossipMetrics)
32+
3233
assert.NotNil(t, gossipMetrics.StateMetrics)
3334
assert.NotNil(t, gossipMetrics.StateMetrics.Height)
3435
assert.NotNil(t, gossipMetrics.StateMetrics.CommitDuration)
3536
assert.NotNil(t, gossipMetrics.StateMetrics.PayloadBufferSize)
3637

38+
assert.NotNil(t, gossipMetrics.ElectionMetrics)
39+
assert.NotNil(t, gossipMetrics.ElectionMetrics.Declaration)
40+
3741
}

gossip/service/gossip_service.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,8 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string
294294

295295
if leaderElection {
296296
logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
297-
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, support.Committer))
297+
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID,
298+
support.Committer), g.metrics.ElectionMetrics)
298299
} else if isStaticOrgLeader {
299300
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
300301
g.deliveryService[chainID].StartDeliverForChannel(chainID, support.Committer, func() {})
@@ -384,9 +385,10 @@ func (g *gossipServiceImpl) Stop() {
384385
g.gossipSvc.Stop()
385386
}
386387

387-
func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool)) election.LeaderElectionService {
388+
func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool),
389+
electionMetrics *gossipMetrics.ElectionMetrics) election.LeaderElectionService {
388390
PKIid := g.mcs.GetPKIidOfCert(g.peerIdentity)
389-
adapter := election.NewAdapter(g, PKIid, gossipCommon.ChainID(chainID))
391+
adapter := election.NewAdapter(g, PKIid, gossipCommon.ChainID(chainID), electionMetrics)
390392
return election.NewLeaderElectionService(adapter, string(PKIid), callback)
391393
}
392394

gossip/service/gossip_service_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,9 +409,12 @@ func TestLeaderElectionWithRealGossip(t *testing.T) {
409409
//Starting leader election services
410410
services := make([]*electionService, n)
411411

412+
electionMetrics := gossipMetrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics
413+
412414
for i := 0; i < n; i++ {
413415
services[i] = &electionService{nil, false, 0}
414-
services[i].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(channelName, services[i].callback)
416+
services[i].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(channelName,
417+
services[i].callback, electionMetrics)
415418
}
416419

417420
logger.Warning("Waiting for leader election")
@@ -437,7 +440,9 @@ func TestLeaderElectionWithRealGossip(t *testing.T) {
437440

438441
for idx, i := range secondChannelPeerIndexes {
439442
secondChannelServices[idx] = &electionService{nil, false, 0}
440-
secondChannelServices[idx].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(secondChannelName, secondChannelServices[idx].callback)
443+
secondChannelServices[idx].LeaderElectionService =
444+
gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(secondChannelName,
445+
secondChannelServices[idx].callback, electionMetrics)
441446
}
442447

443448
assert.True(t, waitForLeaderElection(t, secondChannelServices, time.Second*30, time.Second*2), "One leader should be selected for chanB")

gossip/state/metrics_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,20 @@ type testMetricProvider struct {
2929
fakePayloadBufferSizeGauge *metricsfakes.Gauge
3030
}
3131

32-
func testUtilConstructMetricProvider(t *testing.T) *testMetricProvider {
32+
func testUtilConstructMetricProvider() *testMetricProvider {
3333
fakeProvider := &metricsfakes.Provider{}
3434
fakeHeightGauge := testUtilConstructGuage()
3535
fakeCommitDurationHist := testUtilConstructHist()
3636
fakePayloadBufferSizeGauge := testUtilConstructGuage()
3737

3838
fakeProvider.NewCounterStub = func(opts metrics.CounterOpts) metrics.Counter {
39-
assert.Fail(t, "Unknown counter name: ", opts.Name)
4039
return nil
4140
}
4241
fakeProvider.NewHistogramStub = func(opts metrics.HistogramOpts) metrics.Histogram {
4342
switch opts.Name {
4443
case gossipMetrics.CommitDurationOpts.Name:
4544
return fakeCommitDurationHist
4645
}
47-
assert.Fail(t, "Unknown histogram name: ", opts.Name)
4846
return nil
4947
}
5048
fakeProvider.NewGaugeStub = func(opts metrics.GaugeOpts) metrics.Gauge {
@@ -54,7 +52,6 @@ func testUtilConstructMetricProvider(t *testing.T) *testMetricProvider {
5452
case gossipMetrics.HeightOpts.Name:
5553
return fakeHeightGauge
5654
}
57-
assert.Fail(t, "Unknown gauge name: ", opts.Name)
5855
return nil
5956
}
6057

@@ -93,7 +90,7 @@ func TestMetrics(t *testing.T) {
9390
committedDurationWG := sync.WaitGroup{}
9491
committedDurationWG.Add(1)
9592

96-
testMetricProvider := testUtilConstructMetricProvider(t)
93+
testMetricProvider := testUtilConstructMetricProvider()
9794

9895
testMetricProvider.fakeHeightGauge.SetStub = func(delta float64) {
9996
heightWG.Done()

0 commit comments

Comments
 (0)