Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/tracker: update tracker for DutyAggregator #1164

Merged
merged 4 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef
unsignedSet, err = f.fetchAggregatorData(ctx, duty.Slot, defSet)
if err != nil {
return errors.Wrap(err, "fetch aggregator data")
} else if len(unsignedSet) == 0 { // No aggregators found in this slot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add a test for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add tests for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return nil
}
default:
return errors.New("unsupported duty type", z.Str("type", duty.Type.String()))
Expand Down
67 changes: 55 additions & 12 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
// These constants are used for improving messages for why a duty failed.
const (
fetcherMsg = "couldn't fetch duty data from the beacon node"
fetcherAggAttMsg = "couldn't fetch aggregate attestation duty to attestation data unavailability"
fetcherPrepAggMsg = "couldn't fetch aggregate attestation due to insufficient partial committee subscriptions"
fetcherProposerThresholdMsg = "couldn't propose block due to insufficient partial randao signatures"
fetcherProposerMsg = "couldn't propose block since randao duty failed"
consensusMsg = "consensus algorithm didn't complete"
Expand Down Expand Up @@ -99,7 +101,7 @@ func New(analyser core.Deadliner, deleter core.Deadliner, peers []p2p.Peer, from
analyser: analyser,
deleter: deleter,
fromSlot: fromSlot,
failedDutyReporter: failedDutyReporter,
failedDutyReporter: newFailedDutyReporter(),
participationReporter: newParticipationReporter(peers),
}

Expand Down Expand Up @@ -141,8 +143,9 @@ func (t *Tracker) Run(ctx context.Context) error {
}
}

// dutyFailedComponent returns true if the duty failed. It also returns the component where the duty got stuck. If the duty didn't get stuck, it
// returns the sigAgg component. It assumes that all the input events are for a single duty.
// dutyFailedComponent returns true if the duty failed. It also returns the component where the duty got stuck.
// If the duty didn't get stuck, it returns the sigAgg component.
// It assumes that all the input events are for a single duty.
func dutyFailedComponent(es []event) (bool, component) {
events := make([]event, len(es))
copy(events, es)
Expand Down Expand Up @@ -193,6 +196,30 @@ func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event) (bool, c
}
}
}

if duty.Type == core.DutyAggregator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When DuyAggregator fails in fetcher, it is impossible to distringuish between "no aggregators" and "couldn't fetch duty data from beacon node". We will need to emit some other kind of event for this. We could emit a "timedout" event from retryer maybe? Or just assume that it is "no aggregators" for now.

prepAggFailed, prepAggComp := dutyFailedComponent(allEvents[core.NewPrepareAggregatorDuty(duty.Slot)])
// case 1: DutyPrepareAggregator fails in validatorapi which means VC doesn't support v2 endpoint
// if DutyPrepareAggregator fails in validatorapi then we will not get any event for DutyPrepareAggregator.
if !prepAggFailed && prepAggComp == sentinel {
// Ignore this case by returning false.
return false, fetcher, ""
}

// case 2: DutyPrepareAggregator fails to get threshold number of signatures.
if prepAggFailed {
return true, comp, fetcherPrepAggMsg
}

// case 3: DutyAttester failed to store Attestation data in DutyDB
attFailed, attComp := dutyFailedComponent(allEvents[core.NewAttesterDuty(duty.Slot)])
if attFailed && attComp <= consensus {
return true, comp, fetcherAggAttMsg
}

// Ignore no aggregators found case, since it is not a failure.
return false, sigAgg, ""
}
case consensus:
msg = consensusMsg
case validatorAPI:
Expand All @@ -210,17 +237,33 @@ func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event) (bool, c
return true, comp, msg
}

// failedDutyReporter instruments failed duties.
func failedDutyReporter(ctx context.Context, duty core.Duty, failed bool, component component, reason string) {
if !failed {
return
}
// newFailedDutyReporter returns failed duty reporter which instruments failed duties.
func newFailedDutyReporter() func(ctx context.Context, duty core.Duty, failed bool, component component, reason string) {
failedDutyByComponent := make(map[core.DutyType]component)

return func(ctx context.Context, duty core.Duty, failed bool, component component, reason string) {
c, ok := failedDutyByComponent[duty.Type]
if !failed {
if !ok && duty.Type == core.DutyAggregator && component == fetcher {
log.Warn(ctx, "VCs do not seem to support v2 attestation aggregation, ignoring failures for"+
" DutyPrepareAggregator and DutyAggregator", nil)

log.Warn(ctx, "Duty failed", nil,
z.Any("component", component),
z.Str("reason", reason))
failedDutyByComponent[duty.Type] = component
}

return
}

failedCounter.WithLabelValues(duty.Type.String(), component.String()).Inc()
if !ok || c != component {
log.Warn(ctx, "Duty failed", nil,
z.Any("component", component),
z.Str("reason", reason))

failedDutyByComponent[duty.Type] = component
}

failedCounter.WithLabelValues(duty.Type.String(), component.String()).Inc()
}
}

// analyseParticipation returns a set of share indexes of participated peers.
Expand Down
58 changes: 58 additions & 0 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,61 @@ func TestFromSlot(t *testing.T) {
require.NoError(t, tr.SchedulerEvent(ctx, core.NewAggregatorDuty(thisSlot), nil))
require.Empty(t, tr.events)
}

func TestAnalyseDutyFailedAgg(t *testing.T) {
const slot = 123
dutyAgg := core.NewAggregatorDuty(slot)
dutyPrepAgg := core.NewPrepareAggregatorDuty(slot)
dutyAtt := core.NewAttesterDuty(slot)

t.Run("v2 endpoint not supported", func(t *testing.T) {
allEvents := make(map[core.Duty][]event)
allEvents[dutyAgg] = append(allEvents[dutyAgg], event{
duty: dutyAgg,
component: scheduler,
})

// No events for DutyPrepareAggregator
failed, comp, _ := analyseDutyFailed(dutyAgg, allEvents)
require.False(t, failed)
require.Equal(t, comp, fetcher)
})

t.Run("DutyPrepareAggregator failed", func(t *testing.T) {
allEvents := make(map[core.Duty][]event)
allEvents[dutyAgg] = append(allEvents[dutyAgg], event{
duty: dutyAgg,
component: scheduler,
})
allEvents[dutyPrepAgg] = append(allEvents[dutyPrepAgg], event{
duty: dutyPrepAgg,
component: validatorAPI,
})

failed, comp, msg := analyseDutyFailed(dutyAgg, allEvents)
require.True(t, failed)
require.Equal(t, fetcher, comp)
require.Equal(t, fetcherPrepAggMsg, msg)
})

t.Run("DutyAttester failed", func(t *testing.T) {
allEvents := make(map[core.Duty][]event)
allEvents[dutyAgg] = append(allEvents[dutyAgg], event{
duty: dutyAgg,
component: scheduler,
})
allEvents[dutyPrepAgg] = append(allEvents[dutyPrepAgg], event{
duty: dutyPrepAgg,
component: sigAgg,
})
allEvents[dutyAtt] = append(allEvents[dutyAtt], event{
duty: dutyAtt,
component: fetcher,
})

failed, comp, msg := analyseDutyFailed(dutyAgg, allEvents)
require.True(t, failed)
require.Equal(t, fetcher, comp)
require.Equal(t, fetcherAggAttMsg, msg)
})
}