Skip to content

Commit

Permalink
scheduler: fix duplicate slot (#1392)
Browse files Browse the repository at this point in the history
Fixes issues due to sporadic duplicate slot scheduling.  

category: bug
ticket: #1389
  • Loading branch information
corverroos committed Nov 7, 2022
1 parent 13cf754 commit 76dd3d5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 20 deletions.
18 changes: 9 additions & 9 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,33 +508,33 @@ func newSlotTicker(ctx context.Context, eth2Cl eth2wrap.Client, clock clockwork.
}
}

var (
resp = make(chan core.Slot)
slot = currentSlot()
)
resp := make(chan core.Slot)
go func() {
slot := currentSlot()
for {
select {
case <-ctx.Done():
return
case <-clock.After(slot.Time.Sub(clock.Now())):
}

// Recalculate slot to avoid "thundering herd" problem by skipping slots if missed due
// Avoid "thundering herd" problem by skipping slots if missed due
// to pause-the-world events (i.e. resources are already constrained).
actual := currentSlot()
if actual.Slot != slot.Slot {
if clock.Now().After(slot.Next().Time) {
actual := currentSlot()
log.Warn(ctx, "Slot(s) skipped", nil, z.I64("actual_slot", actual.Slot), z.I64("expect_slot", slot.Slot))
skipCounter.Inc()

slot = actual
}

select {
case <-ctx.Done():
return
case resp <- actual:
case resp <- slot:
}

slot = actual.Next()
slot = slot.Next()
}
}()

Expand Down
24 changes: 13 additions & 11 deletions testutil/validatormock/synccomm.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewSyncCommMember(eth2Cl eth2wrap.Client, epoch eth2p0.Epoch, signFunc Sign
dutiesOK: make(chan struct{}),
selections: make(map[eth2p0.Slot]syncSelections),
selectionsOK: make(map[eth2p0.Slot]chan struct{}),
blockRoot: make(map[eth2p0.Slot]*eth2p0.Root),
blockRoot: make(map[eth2p0.Slot]eth2p0.Root),
blockRootOK: make(map[eth2p0.Slot]chan struct{}),
}
}
Expand All @@ -67,7 +67,7 @@ type SyncCommMember struct {
dutiesOK chan struct{}
selections map[eth2p0.Slot]syncSelections // Sync committee selections per slot
selectionsOK map[eth2p0.Slot]chan struct{}
blockRoot map[eth2p0.Slot]*eth2p0.Root // Beacon block root per slot
blockRoot map[eth2p0.Slot]eth2p0.Root // Beacon block root per slot
blockRootOK map[eth2p0.Slot]chan struct{}
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *SyncCommMember) getSelectionsOK(slot eth2p0.Slot) chan struct{} {
}

// setBlockRoot sets block root for the slot.
func (s *SyncCommMember) setBlockRoot(slot eth2p0.Slot, blockRoot *eth2p0.Root) {
func (s *SyncCommMember) setBlockRoot(slot eth2p0.Slot, blockRoot eth2p0.Root) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -131,7 +131,7 @@ func (s *SyncCommMember) setBlockRoot(slot eth2p0.Slot, blockRoot *eth2p0.Root)
}

// getBlockRoot returns the beacon block root for the provided slot.
func (s *SyncCommMember) getBlockRoot(slot eth2p0.Slot) *eth2p0.Root {
func (s *SyncCommMember) getBlockRoot(slot eth2p0.Slot) eth2p0.Root {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -197,12 +197,12 @@ func (s *SyncCommMember) Message(ctx context.Context, slot eth2p0.Slot) error {
return err
}

err = submitSyncMessages(ctx, s.eth2Cl, slot, blockRoot, s.signFunc, s.duties)
err = submitSyncMessages(ctx, s.eth2Cl, slot, *blockRoot, s.signFunc, s.duties)
if err != nil {
return err
}

s.setBlockRoot(slot, blockRoot)
s.setBlockRoot(slot, *blockRoot)

return nil
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func getSubcommittees(ctx context.Context, eth2Cl eth2client.SpecProvider, duty
}

// submitSyncMessages submits signed sync committee messages for desired slot.
func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0.Slot, blockRoot *eth2p0.Root, signFunc SignFunc, duties syncDuties) error {
func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0.Slot, blockRoot eth2p0.Root, signFunc SignFunc, duties syncDuties) error {
if len(duties) == 0 {
return nil
}
Expand All @@ -358,7 +358,7 @@ func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0
return err
}

sigData, err := signing.GetDataRoot(ctx, eth2Cl, signing.DomainSyncCommittee, epoch, *blockRoot)
sigData, err := signing.GetDataRoot(ctx, eth2Cl, signing.DomainSyncCommittee, epoch, blockRoot)
if err != nil {
return err
}
Expand All @@ -372,7 +372,7 @@ func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0

msgs = append(msgs, &altair.SyncCommitteeMessage{
Slot: slot,
BeaconBlockRoot: *blockRoot,
BeaconBlockRoot: blockRoot,
ValidatorIndex: duty.ValidatorIndex,
Signature: sig,
})
Expand All @@ -389,7 +389,9 @@ func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0
}

// aggContributions submits aggregate altair.SignedContributionAndProof. It returns false if contribution aggregation is not required.
func aggContributions(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot, vals validators, selections syncSelections, blockRoot *eth2p0.Root) (bool, error) {
func aggContributions(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot, vals validators,
selections syncSelections, blockRoot eth2p0.Root,
) (bool, error) {
if len(selections) == 0 {
return false, nil
}
Expand All @@ -402,7 +404,7 @@ func aggContributions(ctx context.Context, eth2Cl eth2wrap.Client, signFunc Sign
var signedContribAndProofs []*altair.SignedContributionAndProof
for _, selection := range selections {
// Query BN to get sync committee contribution.
contrib, err := eth2Cl.SyncCommitteeContribution(ctx, selection.Slot, uint64(selection.SubcommitteeIndex), *blockRoot)
contrib, err := eth2Cl.SyncCommitteeContribution(ctx, selection.Slot, uint64(selection.SubcommitteeIndex), blockRoot)
if err != nil {
return false, err
}
Expand Down

0 comments on commit 76dd3d5

Please sign in to comment.