Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/scheduler: subtract half rtt from sync offset #779

Merged
merged 1 commit into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions core/scheduler/clocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ const (
// newClockSyncer returns a function that returns the current median beacon node clock sync offset.
// The clock sync offset is the duration we need to add to our clock to sync with the beacon node's clock.
// TODO(corver): Improve accuracy by subtracting half ping rtt.
func newClockSyncer(ctx context.Context, eventsProvider eth2client.EventsProvider, clock clockwork.Clock,
genesis time.Time, slotDuration time.Duration,
func newClockSyncer(ctx context.Context, eventsProvider eth2client.EventsProvider, pingFunc func() time.Duration,
clock clockwork.Clock, genesis time.Time, slotDuration time.Duration,
) (func() time.Duration, error) {
var (
mu sync.Mutex
Expand All @@ -61,6 +61,10 @@ func newClockSyncer(ctx context.Context, eventsProvider eth2client.EventsProvide
startTime := genesis.Add(time.Duration(head.Slot) * slotDuration)
newOffset := clock.Since(startTime)

// Subtract half rtt for improved accuracy.
rtt := pingFunc()
newOffset -= rtt / 2

offsets = append(offsets, newOffset)

if len(offsets) > offsetCount {
Expand Down Expand Up @@ -97,3 +101,36 @@ func newClockSyncer(ctx context.Context, eventsProvider eth2client.EventsProvide
return medianOffset
}, nil
}

// newBeaconPinger returns a function that returns the median of the 10 latest pings.
// It uses "/eth/v1/node/syncing" as a ping endpoint proxy since it returns a simple response.
func newBeaconPinger(ctx context.Context, eth2Cl eth2client.NodeSyncingProvider) func() time.Duration {
const rttCount = 10
var rtts []time.Duration

return func() time.Duration {
t0 := time.Now()
_, err := eth2Cl.NodeSyncing(ctx)
if err == nil {
rtt := time.Since(t0)
rtts = append(rtts, rtt)
syncRTTGauge.Set(rtt.Seconds())
}

if len(rtts) > rttCount {
rtts = rtts[len(rtts)-rttCount:] // Trim buffer
}

if len(rtts) == 0 {
return 0
}

// Return median
clone := append([]time.Duration(nil), rtts...)
sort.Slice(clone, func(i, j int) bool {
return clone[i] < clone[j]
})

return clone[len(clone)/2]
}
}
3 changes: 2 additions & 1 deletion core/scheduler/clocksync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestClockSync(t *testing.T) {
clock := clockwork.NewFakeClock()
slotDuration := time.Second
provider := &testEventsProvider{t: t}
syncOffset, err := newClockSyncer(context.Background(), provider, clock, clock.Now(), slotDuration)
pinger := func() time.Duration { return 0 }
syncOffset, err := newClockSyncer(context.Background(), provider, pinger, clock, clock.Now(), slotDuration)
require.NoError(t, err)

require.Zero(t, syncOffset())
Expand Down
7 changes: 7 additions & 0 deletions core/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ var (
Name: "beacon_node_offset_seconds",
Help: "The beacon node clock sync median offset in seconds",
})

syncRTTGauge = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "core",
Subsystem: "scheduler",
Name: "beacon_node_rtt_seconds",
Help: "The beacon node clock sync ping rtt in seconds",
})
)

// instrumentSlot sets the current slot and epoch metrics.
Expand Down
17 changes: 10 additions & 7 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ import (

// eth2Provider defines the eth2 provider subset used by this package.
type eth2Provider interface {
eth2client.NodeSyncingProvider
eth2client.GenesisTimeProvider
eth2client.ValidatorsProvider
eth2client.SlotsPerEpochProvider
eth2client.SlotDurationProvider
eth2client.AttesterDutiesProvider
eth2client.ProposerDutiesProvider
eth2client.EventsProvider
eth2client.GenesisTimeProvider
eth2client.NodeSyncingProvider
eth2client.ProposerDutiesProvider
eth2client.SlotDurationProvider
eth2client.SlotsPerEpochProvider
eth2client.ValidatorsProvider
// Above sorted alphabetically.
}

// delayFunc abstracts slot offset delaying/sleeping for deterministic tests.
Expand Down Expand Up @@ -439,7 +440,9 @@ func newSlotTicker(ctx context.Context, eth2Cl eth2Provider, clock clockwork.Clo
return nil, err
}

syncOffset, err := newClockSyncer(ctx, eth2Cl, clock, genesis, slotDuration)
pingFunc := newBeaconPinger(ctx, eth2Cl)

syncOffset, err := newClockSyncer(ctx, eth2Cl, pingFunc, clock, genesis, slotDuration)
if err != nil {
return nil, err
}
Expand Down