Skip to content

Commit

Permalink
core/scheduler: trigger duty at slot offset (#516)
Browse files Browse the repository at this point in the history
Trigger duties at an offset within each slot.

category: feature
ticket: #515
  • Loading branch information
corverroos committed May 13, 2022
1 parent 6aeb6eb commit 876402b
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 60 deletions.
37 changes: 37 additions & 0 deletions core/scheduler/offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package scheduler

import (
"time"

"github.com/obolnetwork/charon/core"
)

// slotOffsets defines the offsets
// at which duties should be triggered.
var slotOffsets = map[core.DutyType]func(time.Duration) time.Duration{
core.DutyAttester: fraction(1, 3), // 1/3 slot duration
// TODO(corver): Add more duties
}

// fraction returns a function that calculates slot offset
// based on the fraction x/y of total slot duration.
func fraction(x, y int64) func(time.Duration) time.Duration {
return func(total time.Duration) time.Duration {
return (total * time.Duration(x)) / time.Duration(y)
}
}
94 changes: 60 additions & 34 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ type eth2Provider interface {
eth2client.ProposerDutiesProvider
}

// delayFunc abstracts slot offset delaying/sleeping for deterministic tests.
type delayFunc func(duty core.Duty, deadline time.Time) <-chan time.Time

// NewForT returns a new scheduler for testing supporting a fake clock.
func NewForT(t *testing.T, clock clockwork.Clock, pubkeys []core.PubKey, eth2Svc eth2client.Service) *Scheduler {
func NewForT(t *testing.T, clock clockwork.Clock, delayFunc delayFunc, pubkeys []core.PubKey, eth2Svc eth2client.Service) *Scheduler {
t.Helper()

s, err := New(pubkeys, eth2Svc)
require.NoError(t, err)

s.clock = clock
s.delayFunc = delayFunc

return s
}
Expand All @@ -65,21 +69,24 @@ func New(pubkeys []core.PubKey, eth2Svc eth2client.Service) (*Scheduler, error)
}

return &Scheduler{
eth2Cl: eth2Cl,
pubkeys: pubkeys,
quit: make(chan struct{}),
duties: make(map[core.Duty]core.FetchArgSet),
clock: clockwork.NewRealClock(),
eth2Cl: eth2Cl,
pubkeys: pubkeys,
quit: make(chan struct{}),
duties: make(map[core.Duty]core.FetchArgSet),
clock: clockwork.NewRealClock(),
delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time {
return time.After(time.Until(deadline))
},
resolvedEpoch: math.MaxUint64,
}, nil
}

type Scheduler struct {
eth2Cl eth2Provider
pubkeys []core.PubKey
quit chan struct{}
clock clockwork.Clock

eth2Cl eth2Provider
pubkeys []core.PubKey
quit chan struct{}
clock clockwork.Clock
delayFunc delayFunc
resolvedEpoch uint64
duties map[core.Duty]core.FetchArgSet
dutiesMutex sync.Mutex
Expand All @@ -99,6 +106,8 @@ func (s *Scheduler) Stop() {
// Run blocks and runs the scheduler until Stop is called.
func (s *Scheduler) Run() error {
ctx := log.WithTopic(context.Background(), "sched")
ctx, cancel := context.WithCancel(ctx)
defer cancel()

waitChainStart(ctx, s.eth2Cl, s.clock)
waitBeaconSync(ctx, s.eth2Cl, s.clock)
Expand All @@ -118,10 +127,7 @@ func (s *Scheduler) Run() error {

instrumentSlot(slot)

err := s.scheduleSlot(slotCtx, slot)
if err != nil {
log.Error(ctx, "Scheduling slot error", err)
}
s.scheduleSlot(slotCtx, slot)
}
}
}
Expand All @@ -147,7 +153,7 @@ func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgS
}

// scheduleSlot resolves upcoming duties and triggers resolved duties for the slot.
func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) {
if s.getResolvedEpoch() != uint64(slot.Epoch()) {
err := s.resolveDuties(ctx, slot)
if err != nil {
Expand All @@ -167,25 +173,21 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
continue
}

instrumentDuty(duty, argSet)
// Trigger duty async
go func() {
if !delaySlotOffset(ctx, slot, duty, s.delayFunc) {
return // context cancelled
}

ctx, span := core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot")
ctx, span := core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot")
defer span.End()

for _, sub := range s.subs {
err := sub(ctx, duty, argSet)
if err != nil {
// TODO(corver): Improve error handling; possibly call subscription async
// with backoff until duty expires.
span.End()
return err
for _, sub := range s.subs {
if err := sub(ctx, duty, argSet); err != nil {
log.Error(ctx, "Trigger duty subscriber error", err)
}
}
}

span.End()
// TODO(leo): This had to be commented out because the scheduler doesn't need the duty anymore,
// but the validatorAPI will need the duty when verifying a randao. Solved when we have the shared
// component to resolve duties.
// s.deleteDuty(duty)
}()
}

if slot.IsLastInEpoch() {
Expand All @@ -194,8 +196,26 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
log.Warn(ctx, "Resolving duties error (retrying next slot)", err)
}
}
}

return nil
// delaySlotOffset blocks until the slot offset for the duty has been reached and return true.
// It returns false if the context is cancelled.
func delaySlotOffset(ctx context.Context, slot slot, duty core.Duty, delayFunc delayFunc) bool {
fn, ok := slotOffsets[duty.Type]
if !ok {
return true
}

// Calculate delay until slot offset
offset := fn(slot.SlotDuration)
deadline := slot.Time.Add(offset)

select {
case <-ctx.Done():
return false
case <-delayFunc(duty, deadline):
return true
}
}

// resolveDuties resolves the duties for the slot's epoch, caching the results.
Expand Down Expand Up @@ -321,6 +341,8 @@ func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.PubKey, set core.Fet
return true
}

// deleteDuty deletes the duty from the cache.
// TODO(corver): Call this on duty deadline to trim duties.
func (s *Scheduler) deleteDuty(duty core.Duty) {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()
Expand Down Expand Up @@ -410,7 +432,11 @@ func newSlotTicker(ctx context.Context, eth2Cl eth2Provider, clock clockwork.Clo
height++
startTime = startTime.Add(slotDuration)

clock.Sleep(startTime.Sub(clock.Now()))
select {
case <-ctx.Done():
return
case <-clock.After(startTime.Sub(clock.Now())):
}
}
}()

Expand Down
88 changes: 75 additions & 13 deletions core/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"encoding/json"
"flag"
"os"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -159,7 +161,7 @@ func TestSchedulerWait(t *testing.T) {
}, err
}

sched := scheduler.NewForT(t, clock, nil, eth2Cl)
sched := scheduler.NewForT(t, clock, new(delayer).delay, nil, eth2Cl)
sched.Stop() // Just run wait functions, then quit.
require.NoError(t, sched.Run())
require.EqualValues(t, test.WaitSecs, clock.Since(t0).Seconds())
Expand All @@ -175,29 +177,34 @@ func TestSchedulerDuties(t *testing.T) {
Name string
Factor int // Determines how duties are spread per epoch
PropErrs int
Results int
}{
{
// All duties grouped in first slot of epoch
Name: "grouped",
Factor: 0,
Name: "grouped",
Factor: 0,
Results: 2,
},
{
// All duties spread in first N slots of epoch (N is number of validators)
Name: "spread",
Factor: 1,
Name: "spread",
Factor: 1,
Results: 6,
},
{
// All duties spread in first N slots of epoch (except first proposer errors)
Name: "spread_errors",
Factor: 1,
PropErrs: 1,
Results: 5,
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
// Configure beacon mock
var t0 time.Time

valSet := beaconmock.ValidatorSetA
eth2Cl, err := beaconmock.New(
beaconmock.WithValidatorSet(valSet),
Expand All @@ -218,29 +225,34 @@ func TestSchedulerDuties(t *testing.T) {
return origFunc(ctx, epoch, indices)
}

// Get pubkeys for validators to schedule
// get pubkeys for validators to schedule
pubkeys, err := valSet.CorePubKeys()
require.NoError(t, err)

// Construct scheduler
clock := newTestClock(t0)
sched := scheduler.NewForT(t, clock, pubkeys, eth2Cl)
delayer := new(delayer)
sched := scheduler.NewForT(t, clock, delayer.delay, pubkeys, eth2Cl)

// Only test scheduler output for first N slots, so Stop scheduler (and slotTicker) after that.
const stopAfter = 3
slotDuration, err := eth2Cl.SlotDuration(context.Background())
require.NoError(t, err)
clock.CallbackAfter(t0.Add(time.Duration(stopAfter)*slotDuration), func() {
sched.Stop()
time.Sleep(time.Hour) // Do not let the slot ticker tick anymore.
})

// Collect results
type result struct {
Duty string
Time string
DutyStr string `json:"duty"`
Duty core.Duty `json:"-"`
DutyArgSet map[core.PubKey]string
}
var results []result
var (
results []result
mu sync.Mutex
)
sched.Subscribe(func(ctx context.Context, duty core.Duty, set core.FetchArgSet) error {
// Make result human-readable
resultSet := make(map[core.PubKey]string)
Expand All @@ -249,17 +261,39 @@ func TestSchedulerDuties(t *testing.T) {
}

// Add result
mu.Lock()
defer mu.Unlock()

results = append(results, result{
Duty: duty.String(),
Duty: duty,
DutyStr: duty.String(),
DutyArgSet: resultSet,
})

if len(results) == test.Results {
sched.Stop()
}

return nil
})

// Run scheduler
require.NoError(t, sched.Run())

// Add deadlines to results
deadlines := delayer.get()
for i := 0; i < len(results); i++ {
results[i].Time = deadlines[results[i].Duty].UTC().Format("04:05.000")
}
// Make result order deterministic
sort.Slice(results, func(i, j int) bool {
if results[i].Duty.Slot == results[j].Duty.Slot {
return results[i].Duty.Type < results[j].Duty.Type
}

return results[i].Duty.Slot < results[j].Duty.Slot
})

// Assert results
testutil.RequireGoldenJSON(t, results)
})
Expand All @@ -277,13 +311,13 @@ func TestScheduler_GetDuty(t *testing.T) {
)
require.NoError(t, err)

// Get pubkeys for validators to schedule
// get pubkeys for validators to schedule
pubkeys, err := valSet.CorePubKeys()
require.NoError(t, err)

// Construct scheduler
clock := newTestClock(t0)
sched := scheduler.NewForT(t, clock, pubkeys, eth2Cl)
sched := scheduler.NewForT(t, clock, new(delayer).delay, pubkeys, eth2Cl)

_, err = sched.GetDuty(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester})
// due to current design we will return an error if we request the duty of a slot that has not been resolved
Expand Down Expand Up @@ -312,6 +346,34 @@ func TestScheduler_GetDuty(t *testing.T) {
require.NoError(t, sched.Run())
}

// delayer implements scheduler.delayFunc and records the deadline and returns it immediately.
type delayer struct {
mu sync.Mutex
deadlines map[core.Duty]time.Time
}

func (d *delayer) get() map[core.Duty]time.Time {
d.mu.Lock()
defer d.mu.Unlock()

return d.deadlines
}

// delay implements scheduler.delayFunc and records the deadline and returns it immediately.
func (d *delayer) delay(duty core.Duty, deadline time.Time) <-chan time.Time {
d.mu.Lock()
defer d.mu.Unlock()
if d.deadlines == nil {
d.deadlines = make(map[core.Duty]time.Time)
}
d.deadlines[duty] = deadline

resp := make(chan time.Time, 1)
resp <- deadline

return resp
}

func newTestClock(now time.Time) *testClock {
return &testClock{
now: now,
Expand Down
Loading

0 comments on commit 876402b

Please sign in to comment.