Skip to content

Commit

Permalink
core/deadline: ignore deadlined duties (#819)
Browse files Browse the repository at this point in the history
- Modify `Deadliner` to ignore deadlined duties.
- Modify `Add(duty)` to return true if a duty is added successfully.
- Fix deadline tests.

category: refactor
ticket: #814
  • Loading branch information
xenowits committed Jul 21, 2022
1 parent d9eed07 commit f9faed9
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 69 deletions.
130 changes: 92 additions & 38 deletions core/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package core

import (
"context"
"testing"
"time"

eth2client "github.com/attestantio/go-eth2-client"
"github.com/jonboulle/clockwork"

"github.com/obolnetwork/charon/app/errors"
)
Expand All @@ -38,77 +40,129 @@ type slotTimeProvider interface {
// may only be used by a single goroutine. So, multiple instances are required
// for different components and use cases.
type Deadliner interface {
// Add adds a duty to be notified of the Deadline via C.
// Note that duties will be deduplicated and only a single duty will be provided via C.
Add(duty Duty)
// Add returns true if the duty was added for future deadline scheduling. It is idempotent
// and returns true if the duty was previously added and still awaits deadline scheduling. It
// returns false if the duty has already expired and cannot therefore be added for scheduling.
Add(duty Duty) bool

// C returns the same read channel every time and contains deadlined duties.
// It should only be called by a single goroutine.
C() <-chan Duty
}

// deadlinerInput represents the input to inputChan.
type deadlineInput struct {
duty Duty
success chan<- bool
}

// Deadline implements the Deadliner interface.
type Deadline struct {
dutyChan chan Duty
inputChan chan deadlineInput
deadlineChan chan Duty
clock clockwork.Clock
quit chan struct{}
}

// NewForT returns a Deadline for use in tests.
func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Time, clock clockwork.Clock) *Deadline {
t.Helper()

d := &Deadline{
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
clock: clock,
quit: make(chan struct{}),
}

go d.run(ctx, deadlineFunc)

return d
}

// NewDeadliner returns a new instance of Deadline.
// It runs a goroutine which is responsible for reading and storing duties,
// and sending the deadlined duty to receiver's deadlineChan.
func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) *Deadline {
d := &Deadline{
dutyChan: make(chan Duty),
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
clock: clockwork.NewRealClock(),
quit: make(chan struct{}),
}

go func() {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := time.NewTimer(time.Until(currDeadline))
go d.run(ctx, deadlineFunc)

defer func() {
close(d.quit)
currTimer.Stop()
}()
return d
}

setCurrState := func() {
currTimer.Stop()
currDuty, currDeadline = getCurrDuty(duties, deadlineFunc)
currTimer = time.NewTimer(time.Until(currDeadline))
}
func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))

defer func() {
close(d.quit)
currTimer.Stop()
}()

setCurrState := func() {
currTimer.Stop()
currDuty, currDeadline = getCurrDuty(duties, deadlineFunc)
currTimer = d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))
}

// TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected,
// using min heap or ordered map
for {
select {
case <-ctx.Done():
return
case input := <-d.inputChan:
deadline := deadlineFunc(input.duty)
expired := deadline.Before(d.clock.Now())

input.success <- !expired

// Ignore expired duties.
if expired {
continue
}

// TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected,
// using a min heap or an ordered map.
for {
duties[input.duty] = true

if deadline.Before(currDeadline) {
setCurrState()
}
case <-currTimer.Chan():
// Send deadlined duty to receiver.
select {
case <-ctx.Done():
return
case duty := <-d.dutyChan:
duties[duty] = true
setCurrState()
case <-currTimer.C:
// Send deadlined duty to receiver
d.deadlineChan <- currDuty
delete(duties, currDuty)
setCurrState()
case d.deadlineChan <- currDuty:
}
}
}()

return d
delete(duties, currDuty)
setCurrState()
}
}
}

// Add adds a duty to be notified of the deadline.
// TODO(xenowits): Ignore expired duties.
func (d *Deadline) Add(duty Duty) {
// Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully.
func (d *Deadline) Add(duty Duty) bool {
res := make(chan bool)

select {
case <-d.quit:
return false
case d.inputChan <- deadlineInput{duty: duty, success: res}:
}

select {
case <-d.quit:
return
default:
d.dutyChan <- duty
return false
case ok := <-res:
return ok
}
}

Expand Down
95 changes: 74 additions & 21 deletions core/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package core_test

import (
"context"
"sort"
"sync"
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/core"
Expand All @@ -29,43 +32,93 @@ func TestDeadliner(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

expiredDuties, nonExpiredDuties, voluntaryExits, dutyExpired := setupData(t)
clock := clockwork.NewFakeClock()

deadlineFuncProvider := func() func(duty core.Duty) time.Time {
startTime := clock.Now()
return func(duty core.Duty) time.Time {
if duty.Type == core.DutyExit {
// Do not timeout exit duties.
return time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)
return startTime.Add(time.Hour)
}

if dutyExpired(duty) {
return startTime.Add(-1 * time.Hour)
}

return time.Now().Add(time.Millisecond * time.Duration(duty.Slot))
return startTime.Add(time.Duration(duty.Slot) * time.Second)
}
}

deadliner := core.NewDeadliner(ctx, deadlineFuncProvider())
deadliner := core.NewForT(ctx, t, deadlineFuncProvider(), clock)

expectedDuties := []core.Duty{
core.NewVoluntaryExit(2),
core.NewAttesterDuty(2),
wg := &sync.WaitGroup{}

// Add our duties to the deadliner.
addDuties(t, wg, expiredDuties, false, deadliner)
addDuties(t, wg, nonExpiredDuties, true, deadliner)
addDuties(t, wg, voluntaryExits, true, deadliner)

// Wait till all the duties are added to the deadliner.
wg.Wait()

var actualDuties []core.Duty
for i := 0; i < len(nonExpiredDuties); i++ {
// Advance clock by 1 second to trigger deadline of duties.
clock.Advance(time.Second)
actualDuties = append(actualDuties, <-deadliner.C())
}

sort.Slice(actualDuties, func(i, j int) bool {
return actualDuties[i].Slot < actualDuties[j].Slot
})

require.Equal(t, nonExpiredDuties, actualDuties)
}

// sendDuties runs a goroutine which adds the duties to the deadliner channel.
func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expected bool, deadliner *core.Deadline) {
t.Helper()

wg.Add(1)
go func(duties []core.Duty, expected bool) {
defer wg.Done()
for _, duty := range duties {
require.Equal(t, deadliner.Add(duty), expected)
}
}(duties, expected)
}

// setupData sets up the duties to send to deadliner.
func setupData(t *testing.T) ([]core.Duty, []core.Duty, []core.Duty, func(core.Duty) bool) {
t.Helper()

expiredDuties := []core.Duty{
core.NewAttesterDuty(1),
core.NewAttesterDuty(3),
core.NewProposerDuty(2),
core.NewRandaoDuty(3),
}

for _, duty := range expectedDuties {
deadliner.Add(duty)
nonExpiredDuties := []core.Duty{
core.NewProposerDuty(1),
core.NewAttesterDuty(2),
core.NewBuilderProposerDuty(3),
}

var actualDuties []core.Duty
for i := 0; i < len(expectedDuties)-1; i++ {
actualDuty := <-deadliner.C()
actualDuties = append(actualDuties, actualDuty)
voluntaryExits := []core.Duty{
core.NewVoluntaryExit(2),
core.NewVoluntaryExit(4),
}

require.Equal(t, len(expectedDuties), len(actualDuties)+1)
dutyExpired := func(duty core.Duty) bool {
for _, d := range expiredDuties {
if d == duty {
return true
}
}

// Since DutyExit doesn't timeout, we won't receive it from the deadliner.
require.NotEqual(t, expectedDuties[0], actualDuties[0])
return false
}

// AttesterDuty for Slot 1 times out before AttesterDuty for Slot 2
require.Equal(t, expectedDuties[2], actualDuties[0])
require.Equal(t, expectedDuties[1], actualDuties[1])
require.Equal(t, expectedDuties[3], actualDuties[2])
return expiredDuties, nonExpiredDuties, voluntaryExits, dutyExpired
}
7 changes: 5 additions & 2 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ func (t *Tracker) Run(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case e := <-t.input:
// TODO(xenowits): Do not store events for expired duties.
if !t.deadliner.Add(e.duty) {
// Ignore expired duties
continue
}

t.events[e.duty] = append(t.events[e.duty], e)
t.deadliner.Add(e.duty)
case duty := <-t.deadliner.C():
failed, failedComponent, failedMsg := analyseDutyFailed(duty, t.events[duty])

Expand Down
18 changes: 10 additions & 8 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
"github.com/obolnetwork/charon/testutil"
)

func TestTracker1(t *testing.T) {
duty, defSet, pubkey, unsignedDataSet, parSignedDataSet := setupData1(t)
func TestTracker(t *testing.T) {
duty, defSet, pubkey, unsignedDataSet, parSignedDataSet := setupData(t)

t.Run("FailAtConsensus", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
deadliner := testDeadliner1{
deadliner := testDeadliner{
deadlineChan: make(chan core.Duty),
}

Expand All @@ -59,7 +59,7 @@ func TestTracker1(t *testing.T) {

t.Run("Success", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
deadliner := testDeadliner1{
deadliner := testDeadliner{
deadlineChan: make(chan core.Duty),
}

Expand Down Expand Up @@ -92,18 +92,20 @@ func TestTracker1(t *testing.T) {
}

// testDeadliner is a mock deadliner implementation.
type testDeadliner1 struct {
type testDeadliner struct {
deadlineChan chan core.Duty
}

func (testDeadliner1) Add(core.Duty) {}
func (testDeadliner) Add(core.Duty) bool {
return true
}

func (t testDeadliner1) C() <-chan core.Duty {
func (t testDeadliner) C() <-chan core.Duty {
return t.deadlineChan
}

// setupData returns the data required to test tracker.
func setupData1(t *testing.T) (core.Duty, core.DutyDefinitionSet, core.PubKey, core.UnsignedDataSet, core.ParSignedDataSet) {
func setupData(t *testing.T) (core.Duty, core.DutyDefinitionSet, core.PubKey, core.UnsignedDataSet, core.ParSignedDataSet) {
t.Helper()

const (
Expand Down

0 comments on commit f9faed9

Please sign in to comment.