From 0a77f33db6ae7e88fd4f0bac5a1df44b8c96344b Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Thu, 7 Apr 2022 15:28:03 +0200 Subject: [PATCH 1/8] core/scheduler: add duty query method --- core/scheduler/scheduler.go | 63 +++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index bfb062449..87e4d05d6 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -18,6 +18,7 @@ package scheduler import ( "context" "fmt" + "sync" "testing" "time" @@ -80,6 +81,7 @@ type Scheduler struct { resolvedEpoch int64 duties map[core.Duty]core.FetchArgSet + dutiesMutex sync.Mutex subs []func(context.Context, core.Duty, core.FetchArgSet) error } @@ -123,8 +125,42 @@ func (s *Scheduler) Run() error { } } +func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) { + slotsPerEpoch, err := s.eth2Cl.SlotsPerEpoch(ctx) + if err != nil { + return nil, err + } + + epoch := uint64(duty.Slot) / slotsPerEpoch + if !s.isEpochResolved(epoch) { + slotInternal, err := newSlot(ctx, s.eth2Cl, uint64(duty.Slot)) + if err != nil { + return nil, errors.Wrap(err, "creating slot") + } + err = s.resolveDuties(ctx, slotInternal) + if err != nil { + return nil, errors.Wrap(err, "resolving duties") + } + } + + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() + + return s.duties[duty], nil +} + +func (s *Scheduler) isEpochResolved(epoch uint64) bool { + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() + + return uint64(s.resolvedEpoch) >= epoch +} + // scheduleSlot resolves upcoming duties and triggers resolved duties for the slot. func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() + if s.resolvedEpoch != int64(slot.Epoch()) { err := s.resolveDuties(ctx, slot) if err != nil { @@ -173,6 +209,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { } // resolveDuties resolves the duties for the slot's epoch, caching the results. +// Do not call if you do not hold the dutiesMutex. func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { vals, err := resolveActiveValidators(ctx, s.eth2Cl, s.pubkeys, slot.Slot) if err != nil { @@ -290,6 +327,32 @@ type slot struct { SlotDuration time.Duration } +func newSlot(ctx context.Context, eth2Cl eth2Provider, height uint64) (slot, error) { + genesis, err := eth2Cl.GenesisTime(ctx) + if err != nil { + return slot{}, err + } + + slotDuration, err := eth2Cl.SlotDuration(ctx) + if err != nil { + return slot{}, err + } + + slotsPerEpoch, err := eth2Cl.SlotsPerEpoch(ctx) + if err != nil { + return slot{}, err + } + + startTime := genesis.Add(time.Duration(height) * slotDuration) + + return slot{ + Slot: int64(height), + Time: startTime, + SlotsPerEpoch: int64(slotsPerEpoch), + SlotDuration: slotDuration, + }, nil +} + func (s slot) Next() slot { return slot{ Slot: s.Slot + 1, From b947d41007a32dd873bb4e909434269013911186 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Fri, 8 Apr 2022 11:44:32 +0200 Subject: [PATCH 2/8] fix concurrency issues --- core/scheduler/scheduler.go | 106 +++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 50 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 87e4d05d6..24a9f7eef 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -18,6 +18,7 @@ package scheduler import ( "context" "fmt" + "math" "sync" "testing" "time" @@ -69,7 +70,7 @@ func New(pubkeys []core.PubKey, eth2Svc eth2client.Service) (*Scheduler, error) quit: make(chan struct{}), duties: make(map[core.Duty]core.FetchArgSet), clock: clockwork.NewRealClock(), - resolvedEpoch: -1, + resolvedEpoch: math.MaxUint64, }, nil } @@ -79,7 +80,7 @@ type Scheduler struct { quit chan struct{} clock clockwork.Clock - resolvedEpoch int64 + resolvedEpoch uint64 duties map[core.Duty]core.FetchArgSet dutiesMutex sync.Mutex subs []func(context.Context, core.Duty, core.FetchArgSet) error @@ -125,6 +126,7 @@ func (s *Scheduler) Run() error { } } +// GetDuty returns the argSet for a duty if resolved already, otherwise an error. func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) { slotsPerEpoch, err := s.eth2Cl.SlotsPerEpoch(ctx) if err != nil { @@ -133,35 +135,20 @@ func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgS epoch := uint64(duty.Slot) / slotsPerEpoch if !s.isEpochResolved(epoch) { - slotInternal, err := newSlot(ctx, s.eth2Cl, uint64(duty.Slot)) - if err != nil { - return nil, errors.Wrap(err, "creating slot") - } - err = s.resolveDuties(ctx, slotInternal) - if err != nil { - return nil, errors.Wrap(err, "resolving duties") - } + return nil, errors.New("epoch not resolved yet") } - s.dutiesMutex.Lock() - defer s.dutiesMutex.Unlock() - - return s.duties[duty], nil -} - -func (s *Scheduler) isEpochResolved(epoch uint64) bool { - s.dutiesMutex.Lock() - defer s.dutiesMutex.Unlock() + argSet, ok := s.getFetchArgSet(duty) + if !ok { + return nil, errors.New("duty not resolved although epoch is marked as resolved") + } - return uint64(s.resolvedEpoch) >= epoch + return argSet, nil } // scheduleSlot resolves upcoming duties and triggers resolved duties for the slot. func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { - s.dutiesMutex.Lock() - defer s.dutiesMutex.Unlock() - - if s.resolvedEpoch != int64(slot.Epoch()) { + if s.getResolvedEpoch() != uint64(slot.Epoch()) { err := s.resolveDuties(ctx, slot) if err != nil { log.Warn(ctx, "Resolving duties error (retrying next slot)", z.Err(err)) @@ -314,43 +301,62 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { } } - s.resolvedEpoch = int64(slot.Epoch()) + s.setResolvedEpoch(uint64(slot.Epoch())) return nil } -// slot is a beacon chain slot and includes chain metadata to infer epoch and next slot. -type slot struct { - Slot int64 - Time time.Time - SlotsPerEpoch int64 - SlotDuration time.Duration +func (s *Scheduler) getFetchArgSet(duty core.Duty) (core.FetchArgSet, bool) { + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() + + argSet, ok := s.duties[duty] + + return argSet, ok } -func newSlot(ctx context.Context, eth2Cl eth2Provider, height uint64) (slot, error) { - genesis, err := eth2Cl.GenesisTime(ctx) - if err != nil { - return slot{}, err - } +func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.PubKey, set core.FetchArg) bool { + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() - slotDuration, err := eth2Cl.SlotDuration(ctx) - if err != nil { - return slot{}, err + argSet, ok := s.duties[duty] + if !ok { + argSet = make(core.FetchArgSet) } - - slotsPerEpoch, err := eth2Cl.SlotsPerEpoch(ctx) - if err != nil { - return slot{}, err + if _, ok := argSet[pubkey]; ok { + return false } - startTime := genesis.Add(time.Duration(height) * slotDuration) + argSet[pubkey] = set + s.duties[duty] = argSet - return slot{ - Slot: int64(height), - Time: startTime, - SlotsPerEpoch: int64(slotsPerEpoch), - SlotDuration: slotDuration, - }, nil + return true +} + +func (s *Scheduler) getResolvedEpoch() uint64 { + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() + + return s.resolvedEpoch +} + +func (s *Scheduler) setResolvedEpoch(epoch uint64) { + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() + + s.resolvedEpoch = epoch +} + +func (s *Scheduler) isEpochResolved(epoch uint64) bool { + return s.getResolvedEpoch() >= epoch +} + +// slot is a beacon chain slot and includes chain metadata to infer epoch and next slot. +type slot struct { + Slot int64 + Time time.Time + SlotsPerEpoch int64 + SlotDuration time.Duration } func (s slot) Next() slot { From 3bae9519caec2d899be11a76e7dff110dba8bc73 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Fri, 8 Apr 2022 14:16:23 +0200 Subject: [PATCH 3/8] Fix call to private mutexed methods everywhere --- core/scheduler/scheduler.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 24a9f7eef..44f0cb4d8 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -161,7 +161,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { Type: dutyType, } - argSet, ok := s.duties[duty] + argSet, ok := s.getFetchArgSet(duty) if !ok { // Nothing for this duty. continue @@ -182,7 +182,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { } span.End() - delete(s.duties, duty) + s.deleteDuty(duty) } if slot.IsLastInEpoch() { @@ -230,7 +230,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { duty := core.Duty{Slot: int64(attDuty.Slot), Type: core.DutyAttester} - argSet, ok := s.duties[duty] + argSet, ok := s.getFetchArgSet(duty) if !ok { argSet = make(core.FetchArgSet) } @@ -245,7 +245,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { } argSet[pubkey] = arg - s.duties[duty] = argSet + s.setFetchArg(duty, pubkey, arg) log.Debug(ctx, "Resolved attester duty", z.U64("epoch", uint64(slot.Epoch())), @@ -276,7 +276,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { duty := core.Duty{Slot: int64(proDuty.Slot), Type: core.DutyProposer} - argSet, ok := s.duties[duty] + argSet, ok := s.getFetchArgSet(duty) if !ok { argSet = make(core.FetchArgSet) } @@ -291,7 +291,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { } argSet[pubkey] = arg - s.duties[duty] = argSet + s.setFetchArg(duty, pubkey, arg) log.Debug(ctx, "Resolved proposer duty", z.U64("epoch", uint64(slot.Epoch())), @@ -333,6 +333,13 @@ func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.PubKey, set core.Fet return true } +func (s *Scheduler) deleteDuty(duty core.Duty) { + s.dutiesMutex.Lock() + defer s.dutiesMutex.Unlock() + + delete(s.duties, duty) +} + func (s *Scheduler) getResolvedEpoch() uint64 { s.dutiesMutex.Lock() defer s.dutiesMutex.Unlock() From 937add074f41a91c4e4ab16fde683c29f8b9a1b6 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Fri, 8 Apr 2022 14:29:23 +0200 Subject: [PATCH 4/8] Remove comment --- core/scheduler/scheduler.go | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 44f0cb4d8..96b9ec968 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -196,7 +196,6 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { } // resolveDuties resolves the duties for the slot's epoch, caching the results. -// Do not call if you do not hold the dutiesMutex. func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { vals, err := resolveActiveValidators(ctx, s.eth2Cl, s.pubkeys, slot.Slot) if err != nil { @@ -230,23 +229,17 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { duty := core.Duty{Slot: int64(attDuty.Slot), Type: core.DutyAttester} - argSet, ok := s.getFetchArgSet(duty) - if !ok { - argSet = make(core.FetchArgSet) - } - pubkey, ok := vals.PubKeyFromIndex(attDuty.ValidatorIndex) if !ok { log.Warn(ctx, "ignoring unexpected attester duty", z.U64("vidx", uint64(attDuty.ValidatorIndex))) continue - } else if _, ok := argSet[pubkey]; ok { + } + + if !s.setFetchArg(duty, pubkey, arg) { log.Debug(ctx, "Ignoring previously resolved duty", z.Any("duty", duty)) continue } - argSet[pubkey] = arg - s.setFetchArg(duty, pubkey, arg) - log.Debug(ctx, "Resolved attester duty", z.U64("epoch", uint64(slot.Epoch())), z.U64("vidx", uint64(attDuty.ValidatorIndex)), @@ -276,23 +269,17 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error { duty := core.Duty{Slot: int64(proDuty.Slot), Type: core.DutyProposer} - argSet, ok := s.getFetchArgSet(duty) - if !ok { - argSet = make(core.FetchArgSet) - } - pubkey, ok := vals.PubKeyFromIndex(proDuty.ValidatorIndex) if !ok { log.Warn(ctx, "ignoring unexpected proposer duty", z.U64("vidx", uint64(proDuty.ValidatorIndex))) continue - } else if _, ok := argSet[pubkey]; ok { + } + + if !s.setFetchArg(duty, pubkey, arg) { log.Debug(ctx, "Ignoring previously resolved duty", z.Any("duty", duty)) continue } - argSet[pubkey] = arg - s.setFetchArg(duty, pubkey, arg) - log.Debug(ctx, "Resolved proposer duty", z.U64("epoch", uint64(slot.Epoch())), z.U64("vidx", uint64(proDuty.ValidatorIndex)), From 26bbf1a40d3e45fd6c3b291790742f5afb39c44d Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Fri, 8 Apr 2022 14:52:08 +0200 Subject: [PATCH 5/8] Add comment about deleting duties --- core/scheduler/scheduler.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 96b9ec968..65a059e56 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -182,7 +182,10 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { } span.End() - s.deleteDuty(duty) + // TODO(leo): This had to be commented out because the scheduler doesn't need the duty anymore, + // but the validatorAPI will need the duty when verifying a randao. Solved when we have the shared + // component to resolve duties. + // s.deleteDuty(duty) } if slot.IsLastInEpoch() { From fb6185f731e5344ae732f3d93c1702e51d598ddd Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Fri, 8 Apr 2022 16:23:58 +0200 Subject: [PATCH 6/8] Consider in isEpochResolved if we have not started resolving yet --- core/scheduler/scheduler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 65a059e56..fbc81b956 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -345,6 +345,9 @@ func (s *Scheduler) setResolvedEpoch(epoch uint64) { } func (s *Scheduler) isEpochResolved(epoch uint64) bool { + if s.getResolvedEpoch() == math.MaxUint64 { + return false + } return s.getResolvedEpoch() >= epoch } From 333f92241d7ccf02c85a2baa886e9a857c4b708c Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Mon, 11 Apr 2022 14:24:04 +0200 Subject: [PATCH 7/8] Test GetDuty --- core/scheduler/scheduler_test.go | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 482efb8cf..146241b47 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -265,6 +265,52 @@ func TestSchedulerDuties(t *testing.T) { } } +func TestScheduler_GetDuty(t *testing.T) { + // Configure beacon mock + var t0 time.Time + valSet := beaconmock.ValidatorSetA + eth2Cl, err := beaconmock.New( + beaconmock.WithValidatorSet(valSet), + beaconmock.WithGenesisTime(t0), + beaconmock.WithDeterministicDuties(0), + ) + require.NoError(t, err) + + // Get pubkeys for validators to schedule + pubkeys, err := valSet.CorePubKeys() + require.NoError(t, err) + + // Construct scheduler + clock := newTestClock(t0) + sched := scheduler.NewForT(t, clock, pubkeys, eth2Cl) + + _, err = sched.GetDuty(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester}) + // due to current design we will return an error if we request the duty of a slot that has not been resolved + // by the scheduler yet. With DutyResolver, we will have always an answer + require.Error(t, err, "epoch not resolved yet") + + slotDuration, err := eth2Cl.SlotDuration(context.Background()) + require.NoError(t, err) + + clock.CallbackAfter(t0.Add(slotDuration).Add(time.Second), func() { + res, err := sched.GetDuty(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester}) + + require.NoError(t, err) + + pubKeys, err := valSet.CorePubKeys() + require.NoError(t, err) + + for _, pubKey := range pubKeys { + require.NotNil(t, res[pubKey]) + } + + sched.Stop() + }) + + // Run scheduler + require.NoError(t, sched.Run()) +} + func newTestClock(now time.Time) *testClock { return &testClock{ now: now, From c1af5d0166bcfd56ee4ee6d8886b4946c75c2d6f Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Tue, 12 Apr 2022 12:51:37 +0200 Subject: [PATCH 8/8] linting issues --- core/scheduler/scheduler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index fbc81b956..52b63f7cf 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -348,6 +348,7 @@ func (s *Scheduler) isEpochResolved(epoch uint64) bool { if s.getResolvedEpoch() == math.MaxUint64 { return false } + return s.getResolvedEpoch() >= epoch }