Skip to content

Commit

Permalink
core/tracker: improve participation (#2112)
Browse files Browse the repository at this point in the history
Improves participation metric to count multiple duties per slot. For example, we can have multiple attester duties per slot.

category: feature
ticket: #2034
  • Loading branch information
dB2510 committed Apr 21, 2023
1 parent 0ba6c9a commit fb085b2
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 84 deletions.
56 changes: 36 additions & 20 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ type Tracker struct {
failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, step step, reason string, err error)

// participationReporter instruments duty peer participation.
participationReporter func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool)
participationReporter func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, expectedPerPeer int)
}

// New returns a new Tracker. The deleter deadliner must return well after analyser deadliner since duties of the same slot are often analysed together.
Expand Down Expand Up @@ -292,8 +292,8 @@ func (t *Tracker) Run(ctx context.Context) error {
t.failedDutyReporter(ctx, duty, failed, failedStep, failedMsg, failedErr)

// Analyse peer participation
participatedShares, unexpectedShares := analyseParticipation(duty, t.events)
t.participationReporter(ctx, duty, failed, participatedShares, unexpectedShares)
participatedShares, unexpectedShares, expectedPerPeer := analyseParticipation(duty, t.events)
t.participationReporter(ctx, duty, failed, participatedShares, unexpectedShares, expectedPerPeer)
case duty := <-t.deleter.C():
delete(t.events, duty)
}
Expand Down Expand Up @@ -645,26 +645,42 @@ func newUnsupportedIgnorer() func(ctx context.Context, duty core.Duty, failed bo
}
}

// analyseParticipation returns a set of share indexes of participated peers.
func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[int]bool, map[int]bool) {
// analyseParticipation returns a count of partial signatures submitted (correct and unexpected) by share index
// and total expected partial signatures for the given duty.
func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[int]int, map[int]int, int) {
// Set of shareIdx of participated peers.
resp := make(map[int]bool)
unexpectedShares := make(map[int]bool)
resp := make(map[int]int)
unexpectedShares := make(map[int]int)

// Dedup participation for each validator per peer for the given duty. Each peer can submit any number of partial signatures.
type dedupKey struct {
shareIdx int
pubkey core.PubKey
}
dedup := make(map[dedupKey]bool)

// Set of validator keys which had the given duty scheduled.
pubkeyMap := make(map[core.PubKey]bool)
for _, e := range allEvents[duty] {
pubkeyMap[e.pubkey] = true

// If we get a parSigDBInternal event, then the current node participated successfully.
// If we get a parSigDBExternal event, then the corresponding peer with e.shareIdx participated successfully.
if e.step == parSigDBExternal || e.step == parSigDBInternal {
if !isParSigEventExpected(duty, e.pubkey, allEvents) {
unexpectedShares[e.parSig.ShareIdx] = true
unexpectedShares[e.parSig.ShareIdx]++
continue
}

resp[e.parSig.ShareIdx] = true
key := dedupKey{pubkey: e.pubkey, shareIdx: e.parSig.ShareIdx}
if !dedup[key] {
dedup[key] = true
resp[e.parSig.ShareIdx]++
}
}
}

return resp, unexpectedShares
return resp, unexpectedShares, len(pubkeyMap)
}

// isParSigEventExpected returns true if a partial signature event is expected for the given duty and pubkey.
Expand Down Expand Up @@ -707,7 +723,7 @@ func isParSigEventExpected(duty core.Duty, pubkey core.PubKey, allEvents map[cor

// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation
// and unexpectedPeers.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, bool, map[int]bool, map[int]bool) {
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, bool, map[int]int, map[int]int, int) {
// prevAbsent is the set of peers who didn't participate in the last duty per type.
prevAbsent := make(map[core.DutyType][]string)

Expand All @@ -722,27 +738,27 @@ func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty,
}
}

return func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedShares map[int]bool) {
return func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedShares map[int]int, expectedPerPeer int) {
if len(participatedShares) == 0 && !failed {
// Ignore participation metrics and log for noop duties (like DutyAggregator)
return
}

var absentPeers []string
for _, peer := range peers {
if participatedShares[peer.ShareIdx()] {
participationSuccess.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(participatedShares[peer.ShareIdx()]))
participationSuccessLegacy.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(participatedShares[peer.ShareIdx()]))
participationExpect.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(expectedPerPeer))
participationMissed.WithLabelValues(duty.Type.String(), peer.Name).Add(float64(expectedPerPeer - participatedShares[peer.ShareIdx()]))

if participatedShares[peer.ShareIdx()] > 0 {
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(1)
participationSuccess.WithLabelValues(duty.Type.String(), peer.Name).Inc()
participationSuccessLegacy.WithLabelValues(duty.Type.String(), peer.Name).Inc()
participationExpect.WithLabelValues(duty.Type.String(), peer.Name).Inc()
} else if unexpectedShares[peer.ShareIdx()] {
} else if unexpectedShares[peer.ShareIdx()] > 0 {
log.Warn(ctx, "Unexpected event found", nil, z.Str("peer", peer.Name), z.Str("duty", duty.String()))
unexpectedEventsCounter.WithLabelValues(peer.Name).Inc()
unexpectedEventsCounter.WithLabelValues(peer.Name).Add(float64(unexpectedShares[peer.ShareIdx()]))
} else {
absentPeers = append(absentPeers, peer.Name)
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(0)
participationMissed.WithLabelValues(duty.Type.String(), peer.Name).Inc()
participationExpect.WithLabelValues(duty.Type.String(), peer.Name).Inc()
}
}

Expand Down
114 changes: 50 additions & 64 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestStepString(t *testing.T) {

func TestTrackerFailedDuty(t *testing.T) {
const slot = 1
testData, pubkeys := setupData(t, []int{slot})
testData, pubkeys := setupData(t, []int{slot}, 3)

t.Run("FailAtConsensus", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -53,7 +53,7 @@ func TestTrackerFailedDuty(t *testing.T) {

tr := New(analyser, deleter, []p2p.Peer{}, 0)
tr.failedDutyReporter = failedDutyReporter
tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]bool, _ map[int]bool) {
tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]int, _ map[int]int, _ int) {
require.True(t, failed)
}

Expand Down Expand Up @@ -93,7 +93,7 @@ func TestTrackerFailedDuty(t *testing.T) {

tr := New(analyser, deleter, []p2p.Peer{}, 0)
tr.failedDutyReporter = failedDutyReporter
tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]bool, _ map[int]bool) {
tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]int, _ map[int]int, _ int) {
require.False(t, failed)
}

Expand Down Expand Up @@ -336,7 +336,7 @@ func TestDutyFailedStep(t *testing.T) {
func TestTrackerParticipation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
slots := []int{1, 2, 3}
testData, pubkeys := setupData(t, slots)
testData, pubkeys := setupData(t, slots, 3)

// Assuming a DV with 4 nodes.
numPeers := 4
Expand All @@ -346,22 +346,22 @@ func TestTrackerParticipation(t *testing.T) {
}

// Participation set per duty for a cluster.
expectedParticipationPerDuty := map[core.Duty]map[int]bool{
expectedParticipationPerDuty := map[core.Duty]map[int]int{
testData[0].duty: {
1: true,
2: true,
3: true,
4: true,
1: len(pubkeys),
2: len(pubkeys),
3: len(pubkeys),
4: len(pubkeys),
},
testData[1].duty: {
1: true,
2: true,
4: true,
1: len(pubkeys),
2: len(pubkeys),
4: len(pubkeys),
},
testData[2].duty: {
1: true,
2: true,
4: true,
1: len(pubkeys),
2: len(pubkeys),
4: len(pubkeys),
},
}

Expand All @@ -373,7 +373,7 @@ func TestTrackerParticipation(t *testing.T) {
for _, p := range peers {
set := make(core.ParSignedDataSet)
for _, pk := range pubkeys {
if !expectedParticipationPerDuty[td.duty][p.ShareIdx()] {
if expectedParticipationPerDuty[td.duty][p.ShareIdx()] == 0 {
// This peer hasn't participated in this duty for this DV.
continue
}
Expand All @@ -393,9 +393,9 @@ func TestTrackerParticipation(t *testing.T) {

var (
count int
lastParticipation map[int]bool
lastParticipation map[int]int
)
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, failed bool, actualParticipation map[int]bool, _ map[int]bool) {
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, failed bool, actualParticipation map[int]int, _ map[int]int, _ int) {
require.Equal(t, testData[count].duty, actualDuty)
require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty]))
require.False(t, failed)
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestUnexpectedParticipation(t *testing.T) {
deleter := testDeadliner{deadlineChan: make(chan core.Duty)}
data := core.NewPartialSignature(testutil.RandomCoreSignature(), unexpectedPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := make(map[int]bool)
participation := make(map[int]int)

duties := []core.Duty{
core.NewRandaoDuty(slot),
Expand All @@ -467,9 +467,9 @@ func TestUnexpectedParticipation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tr := New(analyser, deleter, peers, 0)

tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, _ int) {
require.Equal(t, d, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]bool{unexpectedPeer: true}))
require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]int{unexpectedPeer: 1}))
require.True(t, reflect.DeepEqual(participatedShares, participation))
require.True(t, failed)
cancel()
Expand Down Expand Up @@ -501,13 +501,13 @@ func TestDutyRandaoUnexpected(t *testing.T) {

data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := make(map[int]bool)
unexpected := map[int]bool{1: true}
participation := make(map[int]int)
unexpected := map[int]int{1: 1}

ctx, cancel := context.WithCancel(context.Background())
tr := New(analyser, deleter, peers, 0)

tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, totalParticipationExpected int) {
if duty.Type == core.DutyProposer {
return
}
Expand Down Expand Up @@ -548,17 +548,18 @@ func TestDutyRandaoExpected(t *testing.T) {

data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := map[int]bool{validPeer: true}
unexpected := make(map[int]bool)
participation := map[int]int{validPeer: 1}
unexpected := make(map[int]int)

ctx, cancel := context.WithCancel(context.Background())
tr := New(analyser, deleter, peers, 0)

tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]int, unexpectedPeers map[int]int, totalParticipationExpected int) {
if duty.Type == core.DutyProposer {
return
}

require.Equal(t, 1, totalParticipationExpected)
require.Equal(t, dutyRandao, duty)
require.True(t, failed)
require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected))
Expand Down Expand Up @@ -602,53 +603,38 @@ type testDutyData struct {
}

// setupData returns test duty data and pubkeys required to test tracker.
func setupData(t *testing.T, slots []int) ([]testDutyData, []core.PubKey) {
func setupData(t *testing.T, slots []int, numVals int) ([]testDutyData, []core.PubKey) {
t.Helper()

const (
vIdxA = 1
vIdxB = 2
notZero = 99 // Validation require non-zero values
)
const notZero = 99 // Validation require non-zero values

pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{
vIdxA: testutil.RandomCorePubKey(t),
vIdxB: testutil.RandomCorePubKey(t),
var pubkeys []core.PubKey
pubkeysByIdx := make(map[eth2p0.ValidatorIndex]core.PubKey)
for i := 0; i < numVals; i++ {
pubkey := testutil.RandomCorePubKey(t)
pubkeysByIdx[eth2p0.ValidatorIndex(i)] = pubkey
pubkeys = append(pubkeys, pubkey)
}

var data []testDutyData

for _, slot := range slots {
duty := core.NewAttesterDuty(int64(slot))

dutyA := eth2v1.AttesterDuty{
Slot: eth2p0.Slot(slot),
ValidatorIndex: vIdxA,
CommitteeIndex: vIdxA,
CommitteeLength: notZero,
CommitteesAtSlot: notZero,
}

dutyB := eth2v1.AttesterDuty{
Slot: eth2p0.Slot(slot),
ValidatorIndex: vIdxB,
CommitteeIndex: vIdxB,
CommitteeLength: notZero,
CommitteesAtSlot: notZero,
}

defset := core.DutyDefinitionSet{
pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB),
}

defset := make(core.DutyDefinitionSet)
unsignedset := make(core.UnsignedDataSet)
unsignedset[pubkeysByIdx[vIdxA]] = testutil.RandomCoreAttestationData(t)
unsignedset[pubkeysByIdx[vIdxB]] = testutil.RandomCoreAttestationData(t)

parsignedset := make(core.ParSignedDataSet)
parsignedset[pubkeysByIdx[vIdxA]] = core.NewPartialAttestation(testutil.RandomAttestation(), 1)
parsignedset[pubkeysByIdx[vIdxB]] = core.NewPartialAttestation(testutil.RandomAttestation(), 1)
for i := 0; i < numVals; i++ {
defset[pubkeysByIdx[eth2p0.ValidatorIndex(i)]] = core.NewAttesterDefinition(&eth2v1.AttesterDuty{
Slot: eth2p0.Slot(slot),
ValidatorIndex: eth2p0.ValidatorIndex(i),
CommitteeIndex: eth2p0.CommitteeIndex(i),
CommitteeLength: notZero,
CommitteesAtSlot: notZero,
})

unsignedset[pubkeysByIdx[eth2p0.ValidatorIndex(i)]] = testutil.RandomCoreAttestationData(t)
parsignedset[pubkeysByIdx[eth2p0.ValidatorIndex(i)]] = core.NewPartialAttestation(testutil.RandomAttestation(), 1)
}

data = append(data, testDutyData{
duty: duty,
Expand All @@ -658,7 +644,7 @@ func setupData(t *testing.T, slots []int) ([]testDutyData, []core.PubKey) {
})
}

return data, []core.PubKey{pubkeysByIdx[vIdxA], pubkeysByIdx[vIdxB]}
return data, pubkeys
}

func TestFromSlot(t *testing.T) {
Expand Down

0 comments on commit fb085b2

Please sign in to comment.