Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
cleanup
  • Loading branch information
xenowits committed Jul 21, 2022
1 parent 76ed685 commit b545420
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 70 deletions.
125 changes: 86 additions & 39 deletions core/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package core

import (
"context"
"github.com/jonboulle/clockwork"
"time"

eth2client "github.com/attestantio/go-eth2-client"
Expand All @@ -38,76 +39,122 @@ type slotTimeProvider interface {
// may only be used by a single goroutine. So, multiple instances are required
// for different components and use cases.
type Deadliner interface {
// Add adds a duty to be notified of the Deadline via C.
// Note that duties will be deduplicated and only a single duty will be provided via C.
Add(duty Duty)
// Add returns true if the duty was added for future deadline scheduling. It is idempotent
// and returns true if the duty was previously added and still awaits deadline scheduling. It
// returns false if the duty has already expired and cannot therefore be added for scheduling.
Add(duty Duty) bool

// C returns the same read channel every time and contains deadlined duties.
// It should only be called by a single goroutine.
C() <-chan Duty
}

// deadlinerInput represents the input to inputChan.
type deadlineInput struct {
duty Duty
success chan<- bool
}

// Deadline implements the Deadliner interface.
type Deadline struct {
dutyChan chan Duty
inputChan chan deadlineInput
deadlineChan chan Duty
clock clockwork.Clock
quit chan struct{}
}

// NewForT returns a Deadline for use in tests.
func NewForT(ctx context.Context, deadlineFunc func(Duty) time.Time, clock clockwork.Clock) *Deadline {
d := &Deadline{
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
clock: clock,
quit: make(chan struct{}),
}

go d.run(ctx, deadlineFunc)

return d
}

// NewDeadliner returns a new instance of Deadline.
// It runs a goroutine which is responsible for reading and storing duties,
// and sending the deadlined duty to receiver's deadlineChan.
func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) *Deadline {
d := &Deadline{
dutyChan: make(chan Duty),
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
clock: clockwork.NewRealClock(),
quit: make(chan struct{}),
}

go func() {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := time.NewTimer(time.Until(currDeadline))
go d.run(ctx, deadlineFunc)

defer func() {
close(d.quit)
currTimer.Stop()
}()
return d
}

setCurrState := func() {
currTimer.Stop()
currDuty, currDeadline = getCurrDuty(duties, deadlineFunc)
currTimer = time.NewTimer(time.Until(currDeadline))
}
func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))

// TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected,
// using min heap or ordered map
for {
select {
case <-ctx.Done():
return
case duty := <-d.dutyChan:
duties[duty] = true
setCurrState()
case <-currTimer.C:
// Send deadlined duty to receiver
d.deadlineChan <- currDuty
delete(duties, currDuty)
defer func() {
close(d.quit)
currTimer.Stop()
}()

setCurrState := func() {
currTimer.Stop()
currDuty, currDeadline = getCurrDuty(duties, deadlineFunc)
currTimer = d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))
}

// TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected,
// using min heap or ordered map
for {
select {
case <-ctx.Done():
return
case input := <-d.inputChan:
deadline := deadlineFunc(input.duty)
expired := deadline.Before(d.clock.Now())

input.success <- !expired

// Ignore expired duties.
if expired {
continue
}

duties[input.duty] = true

if deadline.Before(currDeadline) {
setCurrState()
}
case <-currTimer.Chan():
// Send deadlined duty to receiver.
d.deadlineChan <- currDuty
delete(duties, currDuty)
setCurrState()
}
}()

return d
}
}

// Add adds a duty to be notified of the deadline.
func (d *Deadline) Add(duty Duty) {
// Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully.
func (d *Deadline) Add(duty Duty) bool {
res := make(chan bool)

select {
case <-d.quit:
return false
case d.inputChan <- deadlineInput{duty: duty, success: res}:
}

select {
case <-d.quit:
return
default:
d.dutyChan <- duty
return false
case ok := <-res:
return ok
}
}

Expand Down
97 changes: 69 additions & 28 deletions core/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package core_test

import (
"context"
"github.com/jonboulle/clockwork"
"sort"
"testing"
"time"

Expand All @@ -29,48 +31,87 @@ func TestDeadliner(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const lateFactor = 1
startTime := time.Now()
expiredDuties, nonExpiredDuties, voluntaryExits, dutyExpired := setupData(t)
clock := clockwork.NewFakeClock()

deadlineFunc := func(duty core.Duty) time.Time {
duration := time.Second
if duty.Type == core.DutyExit {
// Do not timeout exit duties.
return time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)
deadlineFuncProvider := func() func(duty core.Duty) time.Time {
startTime := clock.Now()
return func(duty core.Duty) time.Time {
if duty.Type == core.DutyExit {
return startTime.Add(time.Hour)
}

if dutyExpired(duty) {
return startTime.Add(-1 * time.Hour)
}

return startTime.Add(time.Duration(duty.Slot) * time.Second)
}
}

deadliner := core.NewForT(ctx, deadlineFuncProvider(), clock)

// Add our duties to the deadliner.
addDuties(t, expiredDuties, false, deadliner)
addDuties(t, nonExpiredDuties, true, deadliner)
addDuties(t, voluntaryExits, true, deadliner)

start := startTime.Add(duration * time.Duration(duty.Slot))
end := start.Add(duration * time.Duration(lateFactor))
clock.Advance(1 * time.Second)

return end
var actualDuties []core.Duty
for i := 0; i < len(nonExpiredDuties); i++ {
actualDuties = append(actualDuties, <-deadliner.C())
clock.Advance(time.Second)
}

deadliner := core.NewDeadliner(ctx, deadlineFunc)
sort.Slice(actualDuties, func(i, j int) bool {
return actualDuties[i].Slot < actualDuties[j].Slot
})

expectedDuties := []core.Duty{
core.NewVoluntaryExit(2),
core.NewAttesterDuty(2),
require.Equal(t, nonExpiredDuties, actualDuties)
}

// sendDuties runs a goroutine which adds the duties to the deadliner channel.
func addDuties(t *testing.T, duties []core.Duty, expected bool, deadliner *core.Deadline) {
t.Helper()

go func(duties []core.Duty, expected bool) {
for _, duty := range duties {
require.Equal(t, deadliner.Add(duty), expected)
}
}(duties, expected)
}

// setupData sets up the duties to send to deadliner.
func setupData(t *testing.T) ([]core.Duty, []core.Duty, []core.Duty, func(core.Duty) bool) {
t.Helper()

expiredDuties := []core.Duty{
core.NewAttesterDuty(1),
core.NewAttesterDuty(3),
core.NewProposerDuty(2),
core.NewRandaoDuty(3),
}

for _, duty := range expectedDuties {
deadliner.Add(duty)
nonExpiredDuties := []core.Duty{
core.NewProposerDuty(1),
core.NewAttesterDuty(2),
core.NewBuilderProposerDuty(3),
}

var actualDuties []core.Duty
for i := 0; i < len(expectedDuties)-1; i++ {
actualDuty := <-deadliner.C()
actualDuties = append(actualDuties, actualDuty)
voluntaryExits := []core.Duty{
core.NewVoluntaryExit(2),
core.NewVoluntaryExit(4),
}

require.Equal(t, len(expectedDuties), len(actualDuties)+1)
dutyExpired := func(duty core.Duty) bool {
for _, d := range expiredDuties {
if d == duty {
return true
}
}

// Since DutyExit doesn't timeout, we won't receive it from the deadliner.
require.NotEqual(t, expectedDuties[0], actualDuties[0])
return false
}

// AttesterDuty for Slot 1 times out before AttesterDuty for Slot 2
require.Equal(t, expectedDuties[2], actualDuties[0])
require.Equal(t, expectedDuties[1], actualDuties[1])
require.Equal(t, expectedDuties[3], actualDuties[2])
return expiredDuties, nonExpiredDuties, voluntaryExits, dutyExpired
}
7 changes: 4 additions & 3 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func (t *Tracker) Run(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case e := <-t.input:
// TODO(dhruv): Do not store events for expired duties.
t.events[e.duty] = append(t.events[e.duty], e)
t.deadliner.Add(e.duty)
if t.deadliner.Add(e.duty) {
// Call to Add() succeeded which means that the duty has not timed out, so we can store it.
t.events[e.duty] = append(t.events[e.duty], e)
}
case duty := <-t.deadliner.C():
failed, failedComponent, failedMsg := analyseFailedDuty(duty, t.events[duty])

Expand Down

0 comments on commit b545420

Please sign in to comment.