diff --git a/core/tracker/tracker.go b/core/tracker/tracker.go index 7ce305e97..df450bf00 100644 --- a/core/tracker/tracker.go +++ b/core/tracker/tracker.go @@ -236,7 +236,7 @@ type Tracker struct { failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, step step, reason string, err error) // participationReporter instruments duty peer participation. - participationReporter func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) + participationReporter func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, expectedPerPeer int) } // New returns a new Tracker. The deleter deadliner must return well after analyser deadliner since duties of the same slot are often analysed together. @@ -292,8 +292,8 @@ func (t *Tracker) Run(ctx context.Context) error { t.failedDutyReporter(ctx, duty, failed, failedStep, failedMsg, failedErr) // Analyse peer participation - participatedShares, unexpectedShares := analyseParticipation(duty, t.events) - t.participationReporter(ctx, duty, failed, participatedShares, unexpectedShares) + participatedShares, unexpectedShares, expectedPerPeer := analyseParticipation(duty, t.events) + t.participationReporter(ctx, duty, failed, participatedShares, unexpectedShares, expectedPerPeer) case duty := <-t.deleter.C(): delete(t.events, duty) } @@ -645,26 +645,42 @@ func newUnsupportedIgnorer() func(ctx context.Context, duty core.Duty, failed bo } } -// analyseParticipation returns a set of share indexes of participated peers. -func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[int]bool, map[int]bool) { +// analyseParticipation returns a count of partial signatures submitted (correct and unexpected) by share index +// and total expected partial signatures for the given duty. +func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[int]int, map[int]int, int) { // Set of shareIdx of participated peers. - resp := make(map[int]bool) - unexpectedShares := make(map[int]bool) + resp := make(map[int]int) + unexpectedShares := make(map[int]int) + // Dedup participation for each validator per peer for the given duty. Each peer can submit any number of partial signatures. + type dedupKey struct { + shareIdx int + pubkey core.PubKey + } + dedup := make(map[dedupKey]bool) + + // Set of validator keys which had the given duty scheduled. + pubkeyMap := make(map[core.PubKey]bool) for _, e := range allEvents[duty] { + pubkeyMap[e.pubkey] = true + // If we get a parSigDBInternal event, then the current node participated successfully. // If we get a parSigDBExternal event, then the corresponding peer with e.shareIdx participated successfully. if e.step == parSigDBExternal || e.step == parSigDBInternal { if !isParSigEventExpected(duty, e.pubkey, allEvents) { - unexpectedShares[e.parSig.ShareIdx] = true + unexpectedShares[e.parSig.ShareIdx]++ continue } - resp[e.parSig.ShareIdx] = true + key := dedupKey{pubkey: e.pubkey, shareIdx: e.parSig.ShareIdx} + if !dedup[key] { + dedup[key] = true + resp[e.parSig.ShareIdx]++ + } } } - return resp, unexpectedShares + return resp, unexpectedShares, len(pubkeyMap) } // isParSigEventExpected returns true if a partial signature event is expected for the given duty and pubkey. @@ -707,7 +723,7 @@ func isParSigEventExpected(duty core.Duty, pubkey core.PubKey, allEvents map[cor // newParticipationReporter returns a new participation reporter function which logs and instruments peer participation // and unexpectedPeers. -func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, bool, map[int]bool, map[int]bool) { +func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, bool, map[int]int, map[int]int, int) { // prevAbsent is the set of peers who didn't participate in the last duty per type. prevAbsent := make(map[core.DutyType][]string) @@ -722,7 +738,7 @@ func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, } } - return func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedShares map[int]bool) { + return func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedShares map[int]int, expectedPerPeer int) { if len(participatedShares) == 0 && !failed { // Ignore participation metrics and log for noop duties (like DutyAggregator) return @@ -730,19 +746,19 @@ func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, var absentPeers []string for _, peer := range peers { - if participatedShares[peer.ShareIdx()] { + participationSuccess.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(participatedShares[peer.ShareIdx()])) + participationSuccessLegacy.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(participatedShares[peer.ShareIdx()])) + participationExpect.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(expectedPerPeer)) + participationMissed.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(expectedPerPeer - participatedShares[peer.ShareIdx()])) + + if participatedShares[peer.ShareIdx()] > 0 { participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(1) - participationSuccess.WithLabelValues(duty.Type.String(), peer.Name).Inc() - participationSuccessLegacy.WithLabelValues(duty.Type.String(), peer.Name).Inc() - participationExpect.WithLabelValues(duty.Type.String(), peer.Name).Inc() - } else if unexpectedShares[peer.ShareIdx()] { + } else if unexpectedShares[peer.ShareIdx()] > 0 { log.Warn(ctx, "Unexpected event found", nil, z.Str("peer", peer.Name), z.Str("duty", duty.String())) - unexpectedEventsCounter.WithLabelValues(peer.Name).Inc() + unexpectedEventsCounter.WithLabelValues(peer.Name).Add(float64(unexpectedShares[peer.ShareIdx()])) } else { absentPeers = append(absentPeers, peer.Name) participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(0) - participationMissed.WithLabelValues(duty.Type.String(), peer.Name).Inc() - participationExpect.WithLabelValues(duty.Type.String(), peer.Name).Inc() } } diff --git a/core/tracker/tracker_internal_test.go b/core/tracker/tracker_internal_test.go index a4ca3c26e..80d195749 100644 --- a/core/tracker/tracker_internal_test.go +++ b/core/tracker/tracker_internal_test.go @@ -29,7 +29,7 @@ func TestStepString(t *testing.T) { func TestTrackerFailedDuty(t *testing.T) { const slot = 1 - testData, pubkeys := setupData(t, []int{slot}) + testData, pubkeys := setupData(t, []int{slot}, 3) t.Run("FailAtConsensus", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -53,7 +53,7 @@ func TestTrackerFailedDuty(t *testing.T) { tr := New(analyser, deleter, []p2p.Peer{}, 0) tr.failedDutyReporter = failedDutyReporter - tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]bool, _ map[int]bool) { + tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]int, _ map[int]int, _ int) { require.True(t, failed) } @@ -93,7 +93,7 @@ func TestTrackerFailedDuty(t *testing.T) { tr := New(analyser, deleter, []p2p.Peer{}, 0) tr.failedDutyReporter = failedDutyReporter - tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]bool, _ map[int]bool) { + tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]int, _ map[int]int, _ int) { require.False(t, failed) } @@ -336,7 +336,7 @@ func TestDutyFailedStep(t *testing.T) { func TestTrackerParticipation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) slots := []int{1, 2, 3} - testData, pubkeys := setupData(t, slots) + testData, pubkeys := setupData(t, slots, 3) // Assuming a DV with 4 nodes. numPeers := 4 @@ -346,22 +346,22 @@ func TestTrackerParticipation(t *testing.T) { } // Participation set per duty for a cluster. - expectedParticipationPerDuty := map[core.Duty]map[int]bool{ + expectedParticipationPerDuty := map[core.Duty]map[int]int{ testData[0].duty: { - 1: true, - 2: true, - 3: true, - 4: true, + 1: len(pubkeys), + 2: len(pubkeys), + 3: len(pubkeys), + 4: len(pubkeys), }, testData[1].duty: { - 1: true, - 2: true, - 4: true, + 1: len(pubkeys), + 2: len(pubkeys), + 4: len(pubkeys), }, testData[2].duty: { - 1: true, - 2: true, - 4: true, + 1: len(pubkeys), + 2: len(pubkeys), + 4: len(pubkeys), }, } @@ -373,7 +373,7 @@ func TestTrackerParticipation(t *testing.T) { for _, p := range peers { set := make(core.ParSignedDataSet) for _, pk := range pubkeys { - if !expectedParticipationPerDuty[td.duty][p.ShareIdx()] { + if expectedParticipationPerDuty[td.duty][p.ShareIdx()] == 0 { // This peer hasn't participated in this duty for this DV. continue } @@ -393,9 +393,9 @@ func TestTrackerParticipation(t *testing.T) { var ( count int - lastParticipation map[int]bool + lastParticipation map[int]int ) - tr.participationReporter = func(_ context.Context, actualDuty core.Duty, failed bool, actualParticipation map[int]bool, _ map[int]bool) { + tr.participationReporter = func(_ context.Context, actualDuty core.Duty, failed bool, actualParticipation map[int]int, _ map[int]int, _ int) { require.Equal(t, testData[count].duty, actualDuty) require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty])) require.False(t, failed) @@ -453,7 +453,7 @@ func TestUnexpectedParticipation(t *testing.T) { deleter := testDeadliner{deadlineChan: make(chan core.Duty)} data := core.NewPartialSignature(testutil.RandomCoreSignature(), unexpectedPeer) pubkey := testutil.RandomCorePubKey(t) - participation := make(map[int]bool) + participation := make(map[int]int) duties := []core.Duty{ core.NewRandaoDuty(slot), @@ -467,9 +467,9 @@ func TestUnexpectedParticipation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) tr := New(analyser, deleter, peers, 0) - tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) { + tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, _ int) { require.Equal(t, d, duty) - require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]bool{unexpectedPeer: true})) + require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]int{unexpectedPeer: 1})) require.True(t, reflect.DeepEqual(participatedShares, participation)) require.True(t, failed) cancel() @@ -501,13 +501,13 @@ func TestDutyRandaoUnexpected(t *testing.T) { data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer) pubkey := testutil.RandomCorePubKey(t) - participation := make(map[int]bool) - unexpected := map[int]bool{1: true} + participation := make(map[int]int) + unexpected := map[int]int{1: 1} ctx, cancel := context.WithCancel(context.Background()) tr := New(analyser, deleter, peers, 0) - tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) { + tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, totalParticipationExpected int) { if duty.Type == core.DutyProposer { return } @@ -548,17 +548,18 @@ func TestDutyRandaoExpected(t *testing.T) { data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer) pubkey := testutil.RandomCorePubKey(t) - participation := map[int]bool{validPeer: true} - unexpected := make(map[int]bool) + participation := map[int]int{validPeer: 1} + unexpected := make(map[int]int) ctx, cancel := context.WithCancel(context.Background()) tr := New(analyser, deleter, peers, 0) - tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) { + tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, totalParticipationExpected int) { if duty.Type == core.DutyProposer { return } + require.Equal(t, 1, totalParticipationExpected) require.Equal(t, dutyRandao, duty) require.True(t, failed) require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected)) @@ -602,53 +603,38 @@ type testDutyData struct { } // setupData returns test duty data and pubkeys required to test tracker. -func setupData(t *testing.T, slots []int) ([]testDutyData, []core.PubKey) { +func setupData(t *testing.T, slots []int, numVals int) ([]testDutyData, []core.PubKey) { t.Helper() - const ( - vIdxA = 1 - vIdxB = 2 - notZero = 99 // Validation require non-zero values - ) + const notZero = 99 // Validation require non-zero values - pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{ - vIdxA: testutil.RandomCorePubKey(t), - vIdxB: testutil.RandomCorePubKey(t), + var pubkeys []core.PubKey + pubkeysByIdx := make(map[eth2p0.ValidatorIndex]core.PubKey) + for i := 0; i < numVals; i++ { + pubkey := testutil.RandomCorePubKey(t) + pubkeysByIdx[eth2p0.ValidatorIndex(i)] = pubkey + pubkeys = append(pubkeys, pubkey) } var data []testDutyData - for _, slot := range slots { duty := core.NewAttesterDuty(int64(slot)) - dutyA := eth2v1.AttesterDuty{ - Slot: eth2p0.Slot(slot), - ValidatorIndex: vIdxA, - CommitteeIndex: vIdxA, - CommitteeLength: notZero, - CommitteesAtSlot: notZero, - } - - dutyB := eth2v1.AttesterDuty{ - Slot: eth2p0.Slot(slot), - ValidatorIndex: vIdxB, - CommitteeIndex: vIdxB, - CommitteeLength: notZero, - CommitteesAtSlot: notZero, - } - - defset := core.DutyDefinitionSet{ - pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA), - pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB), - } - + defset := make(core.DutyDefinitionSet) unsignedset := make(core.UnsignedDataSet) - unsignedset[pubkeysByIdx[vIdxA]] = testutil.RandomCoreAttestationData(t) - unsignedset[pubkeysByIdx[vIdxB]] = testutil.RandomCoreAttestationData(t) - parsignedset := make(core.ParSignedDataSet) - parsignedset[pubkeysByIdx[vIdxA]] = core.NewPartialAttestation(testutil.RandomAttestation(), 1) - parsignedset[pubkeysByIdx[vIdxB]] = core.NewPartialAttestation(testutil.RandomAttestation(), 1) + for i := 0; i < numVals; i++ { + defset[pubkeysByIdx[eth2p0.ValidatorIndex(i)]] = core.NewAttesterDefinition(ð2v1.AttesterDuty{ + Slot: eth2p0.Slot(slot), + ValidatorIndex: eth2p0.ValidatorIndex(i), + CommitteeIndex: eth2p0.CommitteeIndex(i), + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + }) + + unsignedset[pubkeysByIdx[eth2p0.ValidatorIndex(i)]] = testutil.RandomCoreAttestationData(t) + parsignedset[pubkeysByIdx[eth2p0.ValidatorIndex(i)]] = core.NewPartialAttestation(testutil.RandomAttestation(), 1) + } data = append(data, testDutyData{ duty: duty, @@ -658,7 +644,7 @@ func setupData(t *testing.T, slots []int) ([]testDutyData, []core.PubKey) { }) } - return data, []core.PubKey{pubkeysByIdx[vIdxA], pubkeysByIdx[vIdxB]} + return data, pubkeys } func TestFromSlot(t *testing.T) {