Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/deadline: ignore deadlined duties #819

Merged
merged 4 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 92 additions & 38 deletions core/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package core

import (
"context"
"testing"
"time"

eth2client "github.com/attestantio/go-eth2-client"
"github.com/jonboulle/clockwork"

"github.com/obolnetwork/charon/app/errors"
)
Expand All @@ -38,77 +40,129 @@ type slotTimeProvider interface {
// may only be used by a single goroutine. So, multiple instances are required
// for different components and use cases.
type Deadliner interface {
// Add adds a duty to be notified of the Deadline via C.
// Note that duties will be deduplicated and only a single duty will be provided via C.
Add(duty Duty)
// Add returns true if the duty was added for future deadline scheduling. It is idempotent
// and returns true if the duty was previously added and still awaits deadline scheduling. It
// returns false if the duty has already expired and cannot therefore be added for scheduling.
Add(duty Duty) bool

// C returns the same read channel every time and contains deadlined duties.
// It should only be called by a single goroutine.
C() <-chan Duty
}

// deadlinerInput represents the input to inputChan.
type deadlineInput struct {
duty Duty
success chan<- bool
}

// Deadline implements the Deadliner interface.
type Deadline struct {
dutyChan chan Duty
inputChan chan deadlineInput
deadlineChan chan Duty
clock clockwork.Clock
quit chan struct{}
}

// NewForT returns a Deadline for use in tests.
func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Time, clock clockwork.Clock) *Deadline {
t.Helper()

d := &Deadline{
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
clock: clock,
quit: make(chan struct{}),
}

go d.run(ctx, deadlineFunc)

return d
}

// NewDeadliner returns a new instance of Deadline.
// It runs a goroutine which is responsible for reading and storing duties,
// and sending the deadlined duty to receiver's deadlineChan.
func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) *Deadline {
d := &Deadline{
dutyChan: make(chan Duty),
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
clock: clockwork.NewRealClock(),
quit: make(chan struct{}),
}

go func() {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := time.NewTimer(time.Until(currDeadline))
go d.run(ctx, deadlineFunc)

defer func() {
close(d.quit)
currTimer.Stop()
}()
return d
}

setCurrState := func() {
currTimer.Stop()
currDuty, currDeadline = getCurrDuty(duties, deadlineFunc)
currTimer = time.NewTimer(time.Until(currDeadline))
}
func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))

defer func() {
close(d.quit)
currTimer.Stop()
}()

setCurrState := func() {
currTimer.Stop()
currDuty, currDeadline = getCurrDuty(duties, deadlineFunc)
currTimer = d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))
}

// TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected,
// using min heap or ordered map
for {
select {
case <-ctx.Done():
return
case input := <-d.inputChan:
deadline := deadlineFunc(input.duty)
expired := deadline.Before(d.clock.Now())

input.success <- !expired

// Ignore expired duties.
if expired {
continue
}

// TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected,
// using a min heap or an ordered map.
for {
duties[input.duty] = true

if deadline.Before(currDeadline) {
setCurrState()
}
case <-currTimer.Chan():
// Send deadlined duty to receiver.
select {
case <-ctx.Done():
return
case duty := <-d.dutyChan:
duties[duty] = true
setCurrState()
case <-currTimer.C:
// Send deadlined duty to receiver
d.deadlineChan <- currDuty
delete(duties, currDuty)
setCurrState()
case d.deadlineChan <- currDuty:
}
}
}()

return d
delete(duties, currDuty)
setCurrState()
}
}
}

// Add adds a duty to be notified of the deadline.
// TODO(xenowits): Ignore expired duties.
func (d *Deadline) Add(duty Duty) {
// Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully.
func (d *Deadline) Add(duty Duty) bool {
res := make(chan bool)

select {
case <-d.quit:
return false
case d.inputChan <- deadlineInput{duty: duty, success: res}:
}

select {
case <-d.quit:
return
default:
d.dutyChan <- duty
return false
case ok := <-res:
return ok
}
}

Expand Down
95 changes: 74 additions & 21 deletions core/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package core_test

import (
"context"
"sort"
"sync"
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/core"
Expand All @@ -29,43 +32,93 @@ func TestDeadliner(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

expiredDuties, nonExpiredDuties, voluntaryExits, dutyExpired := setupData(t)
clock := clockwork.NewFakeClock()

deadlineFuncProvider := func() func(duty core.Duty) time.Time {
startTime := clock.Now()
return func(duty core.Duty) time.Time {
if duty.Type == core.DutyExit {
// Do not timeout exit duties.
return time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)
return startTime.Add(time.Hour)
}

if dutyExpired(duty) {
return startTime.Add(-1 * time.Hour)
}

return time.Now().Add(time.Millisecond * time.Duration(duty.Slot))
return startTime.Add(time.Duration(duty.Slot) * time.Second)
}
}

deadliner := core.NewDeadliner(ctx, deadlineFuncProvider())
deadliner := core.NewForT(ctx, t, deadlineFuncProvider(), clock)

expectedDuties := []core.Duty{
core.NewVoluntaryExit(2),
core.NewAttesterDuty(2),
wg := &sync.WaitGroup{}

// Add our duties to the deadliner.
xenowits marked this conversation as resolved.
Show resolved Hide resolved
addDuties(t, wg, expiredDuties, false, deadliner)
addDuties(t, wg, nonExpiredDuties, true, deadliner)
addDuties(t, wg, voluntaryExits, true, deadliner)

// Wait till all the duties are added to the deadliner.
wg.Wait()

var actualDuties []core.Duty
for i := 0; i < len(nonExpiredDuties); i++ {
// Advance clock by 1 second to trigger deadline of duties.
clock.Advance(time.Second)
actualDuties = append(actualDuties, <-deadliner.C())
}

sort.Slice(actualDuties, func(i, j int) bool {
return actualDuties[i].Slot < actualDuties[j].Slot
})

require.Equal(t, nonExpiredDuties, actualDuties)
}

// sendDuties runs a goroutine which adds the duties to the deadliner channel.
func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expected bool, deadliner *core.Deadline) {
t.Helper()

wg.Add(1)
go func(duties []core.Duty, expected bool) {
xenowits marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
for _, duty := range duties {
require.Equal(t, deadliner.Add(duty), expected)
}
}(duties, expected)
}

// setupData sets up the duties to send to deadliner.
func setupData(t *testing.T) ([]core.Duty, []core.Duty, []core.Duty, func(core.Duty) bool) {
t.Helper()

expiredDuties := []core.Duty{
core.NewAttesterDuty(1),
core.NewAttesterDuty(3),
core.NewProposerDuty(2),
core.NewRandaoDuty(3),
}

for _, duty := range expectedDuties {
deadliner.Add(duty)
nonExpiredDuties := []core.Duty{
core.NewProposerDuty(1),
core.NewAttesterDuty(2),
core.NewBuilderProposerDuty(3),
}

var actualDuties []core.Duty
for i := 0; i < len(expectedDuties)-1; i++ {
actualDuty := <-deadliner.C()
actualDuties = append(actualDuties, actualDuty)
voluntaryExits := []core.Duty{
core.NewVoluntaryExit(2),
core.NewVoluntaryExit(4),
}

require.Equal(t, len(expectedDuties), len(actualDuties)+1)
dutyExpired := func(duty core.Duty) bool {
for _, d := range expiredDuties {
if d == duty {
return true
}
}

// Since DutyExit doesn't timeout, we won't receive it from the deadliner.
require.NotEqual(t, expectedDuties[0], actualDuties[0])
return false
}

// AttesterDuty for Slot 1 times out before AttesterDuty for Slot 2
require.Equal(t, expectedDuties[2], actualDuties[0])
require.Equal(t, expectedDuties[1], actualDuties[1])
require.Equal(t, expectedDuties[3], actualDuties[2])
return expiredDuties, nonExpiredDuties, voluntaryExits, dutyExpired
}
7 changes: 5 additions & 2 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ func (t *Tracker) Run(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case e := <-t.input:
// TODO(xenowits): Do not store events for expired duties.
if !t.deadliner.Add(e.duty) {
// Ignore expired duties
continue
}

t.events[e.duty] = append(t.events[e.duty], e)
t.deadliner.Add(e.duty)
case duty := <-t.deadliner.C():
failed, failedComponent, failedMsg := analyseDutyFailed(duty, t.events[duty])

Expand Down
18 changes: 10 additions & 8 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
"github.com/obolnetwork/charon/testutil"
)

func TestTracker1(t *testing.T) {
duty, defSet, pubkey, unsignedDataSet, parSignedDataSet := setupData1(t)
func TestTracker(t *testing.T) {
duty, defSet, pubkey, unsignedDataSet, parSignedDataSet := setupData(t)

t.Run("FailAtConsensus", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
deadliner := testDeadliner1{
deadliner := testDeadliner{
deadlineChan: make(chan core.Duty),
}

Expand All @@ -59,7 +59,7 @@ func TestTracker1(t *testing.T) {

t.Run("Success", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
deadliner := testDeadliner1{
deadliner := testDeadliner{
deadlineChan: make(chan core.Duty),
}

Expand Down Expand Up @@ -92,18 +92,20 @@ func TestTracker1(t *testing.T) {
}

// testDeadliner is a mock deadliner implementation.
type testDeadliner1 struct {
type testDeadliner struct {
deadlineChan chan core.Duty
}

func (testDeadliner1) Add(core.Duty) {}
func (testDeadliner) Add(core.Duty) bool {
return true
}

func (t testDeadliner1) C() <-chan core.Duty {
func (t testDeadliner) C() <-chan core.Duty {
return t.deadlineChan
}

// setupData returns the data required to test tracker.
func setupData1(t *testing.T) (core.Duty, core.DutyDefinitionSet, core.PubKey, core.UnsignedDataSet, core.ParSignedDataSet) {
func setupData(t *testing.T) (core.Duty, core.DutyDefinitionSet, core.PubKey, core.UnsignedDataSet, core.ParSignedDataSet) {
t.Helper()

const (
Expand Down