Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/scheduler: add duty query method #384

Merged
merged 10 commits into from
Apr 12, 2022
63 changes: 63 additions & 0 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package scheduler
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -80,6 +81,7 @@ type Scheduler struct {

resolvedEpoch int64
duties map[core.Duty]core.FetchArgSet
dutiesMutex sync.Mutex
subs []func(context.Context, core.Duty, core.FetchArgSet) error
}

Expand Down Expand Up @@ -123,8 +125,42 @@ func (s *Scheduler) Run() error {
}
}

func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a godoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry about that

slotsPerEpoch, err := s.eth2Cl.SlotsPerEpoch(ctx)
if err != nil {
return nil, err
}

epoch := uint64(duty.Slot) / slotsPerEpoch
if !s.isEpochResolved(epoch) {
slotInternal, err := newSlot(ctx, s.eth2Cl, uint64(duty.Slot))
if err != nil {
return nil, errors.Wrap(err, "creating slot")
}
err = s.resolveDuties(ctx, slotInternal)
if err != nil {
return nil, errors.Wrap(err, "resolving duties")
}
}

s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

return s.duties[duty], nil
}

func (s *Scheduler) isEpochResolved(epoch uint64) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your concern that this will result in skipping a bunch of duties if a VC calls some far future slot. We should refactor resolvedEpoch into a map or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to refactor with an external component that has all that logic, perhaps ResolvedDutiesDB ?

Perhaps we should define:

  • Duty
  • ResolvedDuty
  • SolvedDuty

or some separated concepts and include them in the type system for clarity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that is necessary, we are almost there.

s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

return uint64(s.resolvedEpoch) >= epoch
}

// scheduleSlot resolves upcoming duties and triggers resolved duties for the slot.
func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always decrease the time locks held to a minimum. I would strongly discourage doing network calls while a lock is held for example. Beacon API calls often take multiple seconds, during which you will be locking up validatorapi in this case (or whatever other components use GetDuty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding getter/setter methods for mutex protected data. Then the logic can be lock free.

func (s *Scheduler) getFetchArgSet(duty core.Duty) core.FetchArgSet, bool {
  s.dutiesMutex.Lock()
  defer s.dutiesMutex.Unlock()
  
  return s.duties[duty]
}

func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.Pubkey, core.FetchArg) bool {
  s.dutiesMutex.Lock()
  defer s.dutiesMutex.Unlock()
  
  argSet, ok := s.duties[duty]
  if !ok {
    argSet = make(core.FetchArgSet)
  }
  if _, ok := argSet[pubkey]; ok {
    return false
  }
  
  s.duties[duty] = set
  return true
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@corverroos it is not just the duties map that we are sharing, it is also the resolvedEpoch, that is why I put mutex around everything. Otherwise, we will need another mutex and another extra private methods for resolvedEpoch, that perhaps that is what you want.

I recommend a redesign, where these duties are on a external component that stores them, and also the resolved epochs

Copy link
Contributor

@corverroos corverroos Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can update the resolveDuties map in setFetchArg and also add a getter resolvedEpoch with similar pattern.

Another mutex isn't required, nor a is refactor I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we add a map, I am confused why we keep resolved epochs instead of resolved slots?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I don't think I can update the resolvedEpochs or resolveSlot as an setFetchArg does not mean the whole slot is resolved.

If we move to a resolvedSlot map to bool then is ok, but I think it would be easier then to just put an empty value when a slot is resolved but there are no duties.


if s.resolvedEpoch != int64(slot.Epoch()) {
err := s.resolveDuties(ctx, slot)
if err != nil {
Expand Down Expand Up @@ -173,6 +209,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
}

// resolveDuties resolves the duties for the slot's epoch, caching the results.
// Do not call if you do not hold the dutiesMutex.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not hold the mutex when call this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller holds the mutex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment not accurate anymore, can remove

func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error {
vals, err := resolveActiveValidators(ctx, s.eth2Cl, s.pubkeys, slot.Slot)
if err != nil {
Expand Down Expand Up @@ -290,6 +327,32 @@ type slot struct {
SlotDuration time.Duration
}

func newSlot(ctx context.Context, eth2Cl eth2Provider, height uint64) (slot, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolveDuties doesn't need the full internal slot type, it only uses slot and epoch int64s. I feel it would be simpler to refactor resolveDuties rather than adding this which adds some code duplication (from newSlotTicker).

genesis, err := eth2Cl.GenesisTime(ctx)
if err != nil {
return slot{}, err
}

slotDuration, err := eth2Cl.SlotDuration(ctx)
if err != nil {
return slot{}, err
}

slotsPerEpoch, err := eth2Cl.SlotsPerEpoch(ctx)
if err != nil {
return slot{}, err
}

startTime := genesis.Add(time.Duration(height) * slotDuration)

return slot{
Slot: int64(height),
Time: startTime,
SlotsPerEpoch: int64(slotsPerEpoch),
SlotDuration: slotDuration,
}, nil
}

func (s slot) Next() slot {
return slot{
Slot: s.Slot + 1,
Expand Down