Skip to content

Commit

Permalink
fix(lib/babe): use current system time to yield a new slot (#3133)
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Mar 21, 2023
1 parent 0142b1f commit 9cd6f25
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 131 deletions.
4 changes: 3 additions & 1 deletion lib/babe/babe.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,9 @@ func (b *Service) handleEpoch(epoch uint64) (next uint64, err error) {
case err := <-errCh:
// TODO: errEpochPast is sent on this channel, but it doesnot get logged here
epochTimer.Stop()
logger.Errorf("error from epochHandler: %s", err)
if err != nil {
logger.Errorf("error from epochHandler: %s", err)
}
}

// setup next epoch, re-invoke block authoring
Expand Down
93 changes: 18 additions & 75 deletions lib/babe/epoch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"golang.org/x/exp/maps"
)

type handleSlotFunc = func(epoch uint64, slot Slot, authorityIndex uint32,
Expand All @@ -23,6 +20,7 @@ var (
)

type epochHandler struct {
slotHandler slotHandler
epochNumber uint64
firstSlot uint64

Expand Down Expand Up @@ -53,6 +51,7 @@ func newEpochHandler(epochNumber, firstSlot uint64, epochData *epochData, consta
}

return &epochHandler{
slotHandler: newSlotHandler(constants.slotDuration),
epochNumber: epochNumber,
firstSlot: firstSlot,
constants: constants,
Expand All @@ -62,7 +61,10 @@ func newEpochHandler(epochNumber, firstSlot uint64, epochData *epochData, consta
}, nil
}

// run executes the block production for each available successfully claimed slot
// it is important to note that any error will be transmitted through errCh
func (h *epochHandler) run(ctx context.Context, errCh chan<- error) {
defer close(errCh)
currSlot := getCurrentSlot(h.constants.slotDuration)

// if currSlot < h.firstSlot, it means we're at genesis and waiting for the first slot to arrive.
Expand All @@ -75,83 +77,24 @@ func (h *epochHandler) run(ctx context.Context, errCh chan<- error) {
return
}

// for each slot we're handling, create a timer that will fire when it starts
// we create timers only for slots where we're authoring
authoringSlots := getAuthoringSlots(h.slotToPreRuntimeDigest)
logger.Debugf("authoring in %d slots in epoch %d", len(h.slotToPreRuntimeDigest), h.epochNumber)

type slotWithTimer struct {
startTime time.Time
timer *time.Timer
slotNum uint64
}
for {
currentSlot, err := h.slotHandler.waitForNextSlot(ctx)
if err != nil {
errCh <- err
return
}

slotTimeTimers := make([]*slotWithTimer, 0, len(authoringSlots))
for _, authoringSlot := range authoringSlots {
if authoringSlot < currSlot {
// ignore slots already passed
// check if the slot is an authoring slot otherwise wait for the next slot
preRuntimeDigest, has := h.slotToPreRuntimeDigest[currentSlot.number]
if !has {
continue
}

startTime := getSlotStartTime(authoringSlot, h.constants.slotDuration)
waitTime := time.Until(startTime)
timer := time.NewTimer(waitTime)

slotTimeTimers = append(slotTimeTimers, &slotWithTimer{
timer: timer,
slotNum: authoringSlot,
startTime: startTime,
})

logger.Debugf("start time of slot %d: %v", authoringSlot, startTime)
}

logger.Debugf("authoring in %d slots in epoch %d", len(slotTimeTimers), h.epochNumber)

for _, swt := range slotTimeTimers {
logger.Debugf("waiting for next authoring slot %d", swt.slotNum)

select {
case <-ctx.Done():
for _, swt := range slotTimeTimers {
swt.timer.Stop()
}
return
case <-swt.timer.C:
// we must do a time correction as the slot timer sometimes is triggered
// before the time defined in the constructor due to an inconsistency
// of the language -> https://github.com/golang/go/issues/17696

diff := time.Since(swt.startTime)
if diff < 0 {
time.Sleep(-diff)
}

if _, has := h.slotToPreRuntimeDigest[swt.slotNum]; !has {
// this should never happen
panic(fmt.Sprintf("no VRF proof for authoring slot! slot=%d", swt.slotNum))
}

currentSlot := Slot{
start: swt.startTime,
duration: h.constants.slotDuration,
number: swt.slotNum,
}
err := h.handleSlot(h.epochNumber, currentSlot, h.epochData.authorityIndex, h.slotToPreRuntimeDigest[swt.slotNum])
if err != nil {
logger.Warnf("failed to handle slot %d: %s", swt.slotNum, err)
continue
}
err = h.handleSlot(h.epochNumber, currentSlot, h.epochData.authorityIndex, preRuntimeDigest)
if err != nil {
logger.Warnf("failed to handle slot %d: %s", currentSlot.number, err)
}
}
}

// getAuthoringSlots returns an ordered slice of slot numbers where we can author blocks,
// based on the given VRF output and proof map.
func getAuthoringSlots(slotToPreRuntimeDigest map[uint64]*types.PreRuntimeDigest) []uint64 {
authoringSlots := maps.Keys(slotToPreRuntimeDigest)
sort.Slice(authoringSlots, func(i, j int) bool {
return authoringSlots[i] < authoringSlots[j]
})

return authoringSlots
}
118 changes: 118 additions & 0 deletions lib/babe/epoch_handler_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

//go:build integration

package babe

import (
"context"
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"github.com/ChainSafe/gossamer/pkg/scale"
"github.com/stretchr/testify/require"
)

func TestEpochHandler_run_shouldReturnAfterContextCancel(t *testing.T) {
t.Parallel()

const authorityIndex uint32 = 0
aliceKeyPair := keyring.Alice().(*sr25519.Keypair)
epochData := &epochData{
threshold: scale.MaxUint128,
authorityIndex: authorityIndex,
authorities: []types.Authority{
*types.NewAuthority(aliceKeyPair.Public(), 1),
},
}

const slotDuration = 6 * time.Second
const epochLength uint64 = 100

testConstants := constants{
slotDuration: slotDuration,
epochLength: epochLength,
}

const expectedEpoch = 1
startSlot := getCurrentSlot(slotDuration)
handler := testHandleSlotFunc(t, authorityIndex, expectedEpoch, startSlot)

epochHandler, err := newEpochHandler(1, startSlot, epochData, testConstants, handler, aliceKeyPair)
require.NoError(t, err)
require.Equal(t, epochLength, uint64(len(epochHandler.slotToPreRuntimeDigest)))

timeoutCtx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(7 * time.Second)
cancel()
}()

errCh := make(chan error)
go epochHandler.run(timeoutCtx, errCh)

err = <-errCh
require.ErrorIs(t, err, context.Canceled)
}

func TestEpochHandler_run(t *testing.T) {
t.Parallel()

const authorityIndex uint32 = 0
aliceKeyPair := keyring.Alice().(*sr25519.Keypair)
epochData := &epochData{
threshold: scale.MaxUint128,
authorityIndex: authorityIndex,
authorities: []types.Authority{
*types.NewAuthority(aliceKeyPair.Public(), 1),
},
}

const slotDuration = 6 * time.Second
const epochLength uint64 = 100

testConstants := constants{
slotDuration: slotDuration,
epochLength: epochLength,
}

const expectedEpoch = 1
startSlot := getCurrentSlot(slotDuration)
handler := testHandleSlotFunc(t, authorityIndex, expectedEpoch, startSlot)

epochHandler, err := newEpochHandler(1, startSlot, epochData, testConstants, handler, aliceKeyPair)
require.NoError(t, err)
require.Equal(t, epochLength, uint64(len(epochHandler.slotToPreRuntimeDigest)))

timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*slotDuration)
defer cancel()

errCh := make(chan error)
go epochHandler.run(timeoutCtx, errCh)

err = <-errCh
require.ErrorIs(t, err, context.DeadlineExceeded)

}

func testHandleSlotFunc(t *testing.T, expectedAuthorityIndex uint32,
expectedEpoch, startSlot uint64) handleSlotFunc {
currentSlot := startSlot

return func(epoch uint64, slot Slot, authorityIndex uint32,
preRuntimeDigest *types.PreRuntimeDigest) error {
require.NotNil(t, preRuntimeDigest)
require.Equal(t, expectedEpoch, epoch)
require.Equal(t, expectedAuthorityIndex, authorityIndex)

require.GreaterOrEqual(t, slot.number, currentSlot)

// increase the slot by one so we expect the next call
// to be exactly 1 slot greater than the previous call
currentSlot++
return nil
}
}
58 changes: 3 additions & 55 deletions lib/babe/epoch_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package babe

import (
"context"
"testing"
"time"

Expand All @@ -29,70 +28,19 @@ func TestNewEpochHandler(t *testing.T) {
sd, err := time.ParseDuration("6s")
require.NoError(t, err)

constants := constants{ //nolint:govet
testConstants := constants{
slotDuration: sd,
epochLength: 200,
}

keypair := keyring.Alice().(*sr25519.Keypair)

epochHandler, err := newEpochHandler(1, 9999, epochData, constants, testHandleSlotFunc, keypair)
epochHandler, err := newEpochHandler(1, 9999, epochData, testConstants, testHandleSlotFunc, keypair)
require.NoError(t, err)
require.Equal(t, 200, len(epochHandler.slotToPreRuntimeDigest))
require.Equal(t, uint64(1), epochHandler.epochNumber)
require.Equal(t, uint64(9999), epochHandler.firstSlot)
require.Equal(t, constants, epochHandler.constants)
require.Equal(t, testConstants, epochHandler.constants)
require.Equal(t, epochData, epochHandler.epochData)
require.NotNil(t, epochHandler.handleSlot)
}

func TestEpochHandler_run(t *testing.T) {
sd, err := time.ParseDuration("10ms")
require.NoError(t, err)
startSlot := getCurrentSlot(sd)

var callsToHandleSlot, firstExecutedSlot uint64
testHandleSlotFunc := func(epoch uint64, slot Slot, authorityIndex uint32,
preRuntimeDigest *types.PreRuntimeDigest,
) error {
require.Equal(t, uint64(1), epoch)
if callsToHandleSlot == 0 {
firstExecutedSlot = slot.number
} else {
require.Equal(t, firstExecutedSlot+callsToHandleSlot, slot.number)
}
require.Equal(t, uint32(0), authorityIndex)
require.NotNil(t, preRuntimeDigest)
callsToHandleSlot++
return nil
}

epochData := &epochData{
threshold: scale.MaxUint128,
}

const epochLength uint64 = 100
constants := constants{ //nolint:govet
slotDuration: sd,
epochLength: epochLength,
}

keypair := keyring.Alice().(*sr25519.Keypair)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epochHandler, err := newEpochHandler(1, startSlot, epochData, constants, testHandleSlotFunc, keypair)
require.NoError(t, err)
require.Equal(t, epochLength, uint64(len(epochHandler.slotToPreRuntimeDigest)))

errCh := make(chan error)
go epochHandler.run(ctx, errCh)
timer := time.NewTimer(sd * time.Duration(epochLength))
select {
case <-timer.C:
require.Equal(t, epochLength-(firstExecutedSlot-startSlot), callsToHandleSlot)
case err := <-errCh:
timer.Stop()
require.NoError(t, err)
}
}
Loading

0 comments on commit 9cd6f25

Please sign in to comment.