Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix duplicate slot #1392

Merged
merged 1 commit into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,33 +508,33 @@ func newSlotTicker(ctx context.Context, eth2Cl eth2wrap.Client, clock clockwork.
}
}

var (
resp = make(chan core.Slot)
slot = currentSlot()
)
resp := make(chan core.Slot)
go func() {
slot := currentSlot()
for {
select {
case <-ctx.Done():
return
case <-clock.After(slot.Time.Sub(clock.Now())):
}

// Recalculate slot to avoid "thundering herd" problem by skipping slots if missed due
// Avoid "thundering herd" problem by skipping slots if missed due
// to pause-the-world events (i.e. resources are already constrained).
actual := currentSlot()
if actual.Slot != slot.Slot {
if clock.Now().After(slot.Next().Time) {
actual := currentSlot()
log.Warn(ctx, "Slot(s) skipped", nil, z.I64("actual_slot", actual.Slot), z.I64("expect_slot", slot.Slot))
skipCounter.Inc()

slot = actual
}

select {
case <-ctx.Done():
return
case resp <- actual:
case resp <- slot:
}

slot = actual.Next()
slot = slot.Next()
}
}()

Expand Down
24 changes: 13 additions & 11 deletions testutil/validatormock/synccomm.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewSyncCommMember(eth2Cl eth2wrap.Client, epoch eth2p0.Epoch, signFunc Sign
dutiesOK: make(chan struct{}),
selections: make(map[eth2p0.Slot]syncSelections),
selectionsOK: make(map[eth2p0.Slot]chan struct{}),
blockRoot: make(map[eth2p0.Slot]*eth2p0.Root),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing this it non-pointer since basic eth2 types aren't pointers and this causes a panic when duties time out.

blockRoot: make(map[eth2p0.Slot]eth2p0.Root),
blockRootOK: make(map[eth2p0.Slot]chan struct{}),
}
}
Expand All @@ -67,7 +67,7 @@ type SyncCommMember struct {
dutiesOK chan struct{}
selections map[eth2p0.Slot]syncSelections // Sync committee selections per slot
selectionsOK map[eth2p0.Slot]chan struct{}
blockRoot map[eth2p0.Slot]*eth2p0.Root // Beacon block root per slot
blockRoot map[eth2p0.Slot]eth2p0.Root // Beacon block root per slot
blockRootOK map[eth2p0.Slot]chan struct{}
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *SyncCommMember) getSelectionsOK(slot eth2p0.Slot) chan struct{} {
}

// setBlockRoot sets block root for the slot.
func (s *SyncCommMember) setBlockRoot(slot eth2p0.Slot, blockRoot *eth2p0.Root) {
func (s *SyncCommMember) setBlockRoot(slot eth2p0.Slot, blockRoot eth2p0.Root) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -131,7 +131,7 @@ func (s *SyncCommMember) setBlockRoot(slot eth2p0.Slot, blockRoot *eth2p0.Root)
}

// getBlockRoot returns the beacon block root for the provided slot.
func (s *SyncCommMember) getBlockRoot(slot eth2p0.Slot) *eth2p0.Root {
func (s *SyncCommMember) getBlockRoot(slot eth2p0.Slot) eth2p0.Root {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -197,12 +197,12 @@ func (s *SyncCommMember) Message(ctx context.Context, slot eth2p0.Slot) error {
return err
}

err = submitSyncMessages(ctx, s.eth2Cl, slot, blockRoot, s.signFunc, s.duties)
err = submitSyncMessages(ctx, s.eth2Cl, slot, *blockRoot, s.signFunc, s.duties)
if err != nil {
return err
}

s.setBlockRoot(slot, blockRoot)
s.setBlockRoot(slot, *blockRoot)

return nil
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func getSubcommittees(ctx context.Context, eth2Cl eth2client.SpecProvider, duty
}

// submitSyncMessages submits signed sync committee messages for desired slot.
func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0.Slot, blockRoot *eth2p0.Root, signFunc SignFunc, duties syncDuties) error {
func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0.Slot, blockRoot eth2p0.Root, signFunc SignFunc, duties syncDuties) error {
if len(duties) == 0 {
return nil
}
Expand All @@ -358,7 +358,7 @@ func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0
return err
}

sigData, err := signing.GetDataRoot(ctx, eth2Cl, signing.DomainSyncCommittee, epoch, *blockRoot)
sigData, err := signing.GetDataRoot(ctx, eth2Cl, signing.DomainSyncCommittee, epoch, blockRoot)
if err != nil {
return err
}
Expand All @@ -372,7 +372,7 @@ func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0

msgs = append(msgs, &altair.SyncCommitteeMessage{
Slot: slot,
BeaconBlockRoot: *blockRoot,
BeaconBlockRoot: blockRoot,
ValidatorIndex: duty.ValidatorIndex,
Signature: sig,
})
Expand All @@ -389,7 +389,9 @@ func submitSyncMessages(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0
}

// aggContributions submits aggregate altair.SignedContributionAndProof. It returns false if contribution aggregation is not required.
func aggContributions(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot, vals validators, selections syncSelections, blockRoot *eth2p0.Root) (bool, error) {
func aggContributions(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot, vals validators,
selections syncSelections, blockRoot eth2p0.Root,
) (bool, error) {
if len(selections) == 0 {
return false, nil
}
Expand All @@ -402,7 +404,7 @@ func aggContributions(ctx context.Context, eth2Cl eth2wrap.Client, signFunc Sign
var signedContribAndProofs []*altair.SignedContributionAndProof
for _, selection := range selections {
// Query BN to get sync committee contribution.
contrib, err := eth2Cl.SyncCommitteeContribution(ctx, selection.Slot, uint64(selection.SubcommitteeIndex), *blockRoot)
contrib, err := eth2Cl.SyncCommitteeContribution(ctx, selection.Slot, uint64(selection.SubcommitteeIndex), blockRoot)
if err != nil {
return false, err
}
Expand Down