Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/tracker: fix inconsistent message logic #1356

Merged
merged 1 commit into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,19 @@ const (
msgSigAgg = "bug: threshold aggregation of partial signatures failed due to inconsistent signed data"
)

// parsigsByMsg contains partial signatures grouped by message root.
type parsigsByMsg map[[32]byte][]core.ParSignedData
// parsigsByMsg contains partial signatures grouped by message root grouped by pubkey.
type parsigsByMsg map[core.PubKey]map[[32]byte][]core.ParSignedData

// MsgRootsConsistent returns true if the all partial signatures have consistent message roots.
func (m parsigsByMsg) MsgRootsConsistent() bool {
for _, inner := range m {
if len(inner) > 1 {
return false
}
}

return true
}

// event represents an event emitted by a core workflow step.
type event struct {
Expand Down Expand Up @@ -245,7 +256,7 @@ func (t *Tracker) Run(ctx context.Context) error {
t.parSigReporter(ctx, duty, parsigs)

// Analyse failed duties
failed, failedStep, failedMsg := analyseDutyFailed(duty, t.events, parsigs)
failed, failedStep, failedMsg := analyseDutyFailed(duty, t.events, parsigs.MsgRootsConsistent())
t.failedDutyReporter(ctx, duty, failed, failedStep, failedMsg)

// Analyse peer participation
Expand Down Expand Up @@ -287,7 +298,7 @@ func dutyFailedStep(es []event) (bool, step) {
//
// It returns false if the duty didn't fail, i.e., the duty
// didn't get stuck and completed the sigAgg step.
func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event, parsigMsgs parsigsByMsg) (bool, step, string) {
func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event, msgRootConsistent bool) (bool, step, string) {
failed, step := dutyFailedStep(allEvents[duty])
if !failed {
return false, zero, ""
Expand All @@ -306,7 +317,7 @@ func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event, parsigMs
case parSigEx:
msg = msgParSigEx
case parSigDBThreshold:
if len(parsigMsgs) <= 1 {
if msgRootConsistent {
msg = msgParSigDBInsufficient
} else {
msg = msgParSigDBInconsistent
Expand All @@ -327,29 +338,39 @@ func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event, parsigMs
return true, step, msg
}

// analyseParSigs returns a mapping of partial signed data messages by peers (share index).
// analyseParSigs returns a mapping of partial signed data messages by peers (share index) by validator (pubkey).
func analyseParSigs(events []event) parsigsByMsg {
type dedupKey struct {
Pubkey core.PubKey
ShareIdx int
}
var (
dedup = make(map[int]bool)
datas = make(map[[32]byte][]core.ParSignedData)
dedup = make(map[dedupKey]bool)
datas = make(map[core.PubKey]map[[32]byte][]core.ParSignedData)
)

for _, e := range events {
if e.parSig == nil {
continue
}
if dedup[e.parSig.ShareIdx] {
key := dedupKey{Pubkey: e.pubkey, ShareIdx: e.parSig.ShareIdx}
if dedup[key] {
continue
}
dedup[e.parSig.ShareIdx] = true
dedup[key] = true

root, err := e.parSig.MessageRoot()
if err != nil {
log.Warn(context.Background(), "Parsig message root", err)
continue // Just log and ignore as this is highly unlikely and non-critical code.
}

datas[root] = append(datas[root], *e.parSig)
inner, ok := datas[e.pubkey]
if !ok {
inner = make(map[[32]byte][]core.ParSignedData)
}
inner[root] = append(inner[root], *e.parSig)
datas[e.pubkey] = inner
}

return datas
Expand Down Expand Up @@ -762,36 +783,45 @@ func (t *Tracker) SigAggEvent(ctx context.Context, duty core.Duty, pubkey core.P
}

func reportParSigs(ctx context.Context, duty core.Duty, parsigMsgs parsigsByMsg) {
if len(parsigMsgs) <= 1 {
if parsigMsgs.MsgRootsConsistent() {
return // Nothing to report.
}

inconsistentCounter.WithLabelValues(duty.Type.String()).Inc()

// Group indexes by json for logging.
indexesByJSON := make(map[string][]int)
for _, parsigs := range parsigMsgs {
var key string
for _, parsig := range parsigs {
if key == "" {
bytes, err := json.Marshal(parsig)
if err != nil {
continue // Ignore error, just skip it.
for pubkey, parsigsByMsg := range parsigMsgs {
if len(parsigMsgs) <= 1 {
continue // Nothing to report for this pubkey.
}

// Group indexes by json for logging.
indexesByJSON := make(map[string][]int)

for _, parsigs := range parsigsByMsg {
var key string
for _, parsig := range parsigs {
if key == "" {
bytes, err := json.Marshal(parsig)
if err != nil {
continue // Ignore error, just skip it.
}
key = string(bytes)
}
key = string(bytes)
indexesByJSON[key] = append(indexesByJSON[key], parsig.ShareIdx)
}
indexesByJSON[key] = append(indexesByJSON[key], parsig.ShareIdx)
}
}

if expectInconsistentParSigs(duty.Type) {
log.Debug(ctx, "Inconsistent sync committee partial signed data",
z.Any("duty", duty),
z.Any("data", indexesByJSON))
} else {
log.Warn(ctx, "Inconsistent partial signed data", nil,
z.Any("duty", duty),
z.Any("data", indexesByJSON))
if expectInconsistentParSigs(duty.Type) {
log.Debug(ctx, "Inconsistent sync committee partial signed data",
z.Any("pubkey", pubkey),
z.Any("duty", duty),
z.Any("data", indexesByJSON))
} else {
log.Warn(ctx, "Inconsistent partial signed data", nil,
z.Any("pubkey", pubkey),
z.Any("duty", duty),
z.Any("data", indexesByJSON))
}
}
}

Expand Down
57 changes: 28 additions & 29 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestAnalyseDutyFailed(t *testing.T) {
},
}

failed, step, msg := analyseDutyFailed(attDuty, events, nil)
failed, step, msg := analyseDutyFailed(attDuty, events, true)
require.True(t, failed)
require.Equal(t, step, fetcher)
require.Equal(t, msg, msgFetcher)
Expand All @@ -157,7 +157,7 @@ func TestAnalyseDutyFailed(t *testing.T) {
step: fetcher,
})

failed, step, msg = analyseDutyFailed(attDuty, events, nil)
failed, step, msg = analyseDutyFailed(attDuty, events, true)
require.True(t, failed)
require.Equal(t, step, consensus)
require.Equal(t, msg, msgConsensus)
Expand All @@ -168,7 +168,7 @@ func TestAnalyseDutyFailed(t *testing.T) {
step: consensus,
})

failed, step, msg = analyseDutyFailed(attDuty, events, nil)
failed, step, msg = analyseDutyFailed(attDuty, events, true)
require.True(t, failed)
require.Equal(t, step, validatorAPI)
require.Equal(t, msg, msgValidatorAPI)
Expand All @@ -179,7 +179,7 @@ func TestAnalyseDutyFailed(t *testing.T) {
step: validatorAPI,
})

failed, step, msg = analyseDutyFailed(attDuty, events, nil)
failed, step, msg = analyseDutyFailed(attDuty, events, true)
require.True(t, failed)
require.Equal(t, step, parSigDBInternal)
require.Equal(t, msg, msgParSigDBInternal)
Expand All @@ -190,7 +190,7 @@ func TestAnalyseDutyFailed(t *testing.T) {
step: parSigDBInternal,
})

failed, step, msg = analyseDutyFailed(attDuty, events, nil)
failed, step, msg = analyseDutyFailed(attDuty, events, true)
require.True(t, failed)
require.Equal(t, step, parSigEx)
require.Equal(t, msg, msgParSigEx)
Expand All @@ -201,23 +201,18 @@ func TestAnalyseDutyFailed(t *testing.T) {
step: parSigEx,
})

failed, step, msg = analyseDutyFailed(attDuty, events, nil)
failed, step, msg = analyseDutyFailed(attDuty, events, true)
require.True(t, failed)
require.Equal(t, step, parSigDBThreshold)
require.Equal(t, msg, msgParSigDBInsufficient)

inconsistentParsigs := parsigsByMsg{
testutil.RandomRoot(): nil,
testutil.RandomRoot(): nil,
}

failed, step, msg = analyseDutyFailed(attDuty, events, inconsistentParsigs)
failed, step, msg = analyseDutyFailed(attDuty, events, false)
require.True(t, failed)
require.Equal(t, step, parSigDBThreshold)
require.Equal(t, msg, msgParSigDBInconsistent)

events[syncMsgDuty] = events[attDuty]
failed, step, msg = analyseDutyFailed(syncMsgDuty, events, inconsistentParsigs)
failed, step, msg = analyseDutyFailed(syncMsgDuty, events, false)
require.True(t, failed)
require.Equal(t, step, parSigDBThreshold)
require.Equal(t, msg, msgParSigDBInconsistentSync)
Expand All @@ -244,7 +239,7 @@ func TestAnalyseDutyFailed(t *testing.T) {
},
}

failed, step, msg := analyseDutyFailed(proposerDuty, events, nil)
failed, step, msg := analyseDutyFailed(proposerDuty, events, true)
require.True(t, failed)
require.Equal(t, step, fetcher)
require.Equal(t, msg, msgFetcherProposerFailedRandao)
Expand All @@ -255,15 +250,15 @@ func TestAnalyseDutyFailed(t *testing.T) {
step: parSigEx,
})

failed, step, msg = analyseDutyFailed(proposerDuty, events, nil)
failed, step, msg = analyseDutyFailed(proposerDuty, events, true)
require.True(t, failed)
require.Equal(t, step, fetcher)
require.Equal(t, msg, msgFetcherProposerFewRandaos)

// No Randaos
events[randaoDuty] = nil

failed, step, msg = analyseDutyFailed(proposerDuty, events, nil)
failed, step, msg = analyseDutyFailed(proposerDuty, events, true)
require.True(t, failed)
require.Equal(t, step, fetcher)
require.Equal(t, msg, msgFetcherProposerZeroRandaos)
Expand All @@ -279,7 +274,7 @@ func TestAnalyseDutyFailed(t *testing.T) {
events[attDuty] = append(events[attDuty], event{step: step})
}

failed, step, msg := analyseDutyFailed(attDuty, events, nil)
failed, step, msg := analyseDutyFailed(attDuty, events, true)
require.False(t, failed)
require.Equal(t, zero, step)
require.Empty(t, msg)
Expand Down Expand Up @@ -859,7 +854,7 @@ func TestAnalyseFetcherFailed(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
failed, step, msg := analyseDutyFailed(test.duty, test.events, nil)
failed, step, msg := analyseDutyFailed(test.duty, test.events, true)
require.Equal(t, failed, test.failed)
require.Equal(t, test.msg, msg)
require.Equal(t, fetcher, step)
Expand Down Expand Up @@ -952,13 +947,14 @@ func TestAnalyseParSigs(t *testing.T) {

var events []event

makeEvents := func(n int) {
makeEvents := func(n int, pubkey string) {
data := testutil.RandomCoreVersionSignedBeaconBlock(t)
offset := len(events)
for i := 0; i < n; i++ {
data, err := data.SetSignature(testutil.RandomCoreSignature())
require.NoError(t, err)
events = append(events, event{
pubkey: core.PubKey(pubkey),
parSig: &core.ParSignedData{
ShareIdx: offset + i,
SignedData: data,
Expand All @@ -967,20 +963,23 @@ func TestAnalyseParSigs(t *testing.T) {
}
}

expect := map[int]bool{
4: true,
2: true,
expect := map[int]string{
4: "a",
2: "a",
6: "b",
}
for n := range expect {
makeEvents(n)
for n, pubkey := range expect {
makeEvents(n, pubkey)
}

parSigMsgs := analyseParSigs(events)
require.Len(t, parSigMsgs, len(expect))
allParSigMsgs := analyseParSigs(events)

lengths := make(map[int]bool)
for _, indexes := range parSigMsgs {
lengths[len(indexes)] = true
lengths := make(map[int]string)
for pubkey, parSigMsgs := range allParSigMsgs {
for _, indexes := range parSigMsgs {
lengths[len(indexes)] = string(pubkey)
}
}

require.Equal(t, expect, lengths)
}