Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lib/babe): fix timing for transition between epochs #1636

Merged
merged 20 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 107 additions & 80 deletions lib/babe/babe.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) {
"epoch length (slots)", babeService.epochLength,
"authorities", Authorities(babeService.epochData.authorities),
"authority index", babeService.epochData.authorityIndex,
"threshold", babeService.epochData.threshold.ToLEBytes(),
"threshold", babeService.epochData.threshold,
"randomness", babeService.epochData.randomness,
)
return babeService, nil
Expand Down Expand Up @@ -226,20 +226,23 @@ func (b *Service) EpochLength() uint64 {

// Pause pauses the service ie. halts block production
func (b *Service) Pause() error {
b.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be a chance that Pause() is called when the b.pause channel is already close, thus causing a close of closed channel panic

defer b.Unlock()

if b.paused {
return errors.New("service already paused")
}

b.Lock()
defer b.Unlock()

b.pause <- struct{}{}
b.paused = true
b.pause <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems odd that there are multiple two places in the code where paused is set to true. Also having both pause and paused seems unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the other place to just call Pause() instead. it needs the paused bool in the Resume function to get if the service is paused already or not, is there a better way to do that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably close the pause channel and reinitialize it on resume.

func (b *Service) IsPaused() {
	select {
	case: <-b.pause
		return true
	default:
		return false
	}
}
func (b *Service) Pause() {
	close(b.pause)
}
func otherFunc() {
	select {
	case <-time.After(time.Until(epochStartTime)):
	case <-b.ctx.Done():
		return nil
	case <-b.pause:
		return nil
	}
}

return nil
}

// Resume resumes the service ie. resumes block production
func (b *Service) Resume() error {
b.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acquire lock after checking b.IsPaused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should be atomic in the case that Pause or Resume are called concurrently by multiple processes, process A might see it's not paused, acquire the lock, then do b.initiate() but in between seeing it's not paused an acquiring the lock another process might also see the same thing, thus causing b.initiate() to be called twice which would be bad

defer b.Unlock()

if !b.paused {
return nil
}
Expand All @@ -250,9 +253,6 @@ func (b *Service) Resume() error {
return err
}

b.Lock()
defer b.Unlock()

b.paused = false
go b.initiate(epoch)
logger.Info("service resumed", "epoch", epoch)
Expand Down Expand Up @@ -351,93 +351,118 @@ func (b *Service) initiate(epoch uint64) {
return
}

b.invokeBlockAuthoring(epoch)
}

func (b *Service) invokeBlockAuthoring(epoch uint64) {
// calculate current slot
startSlot := getCurrentSlot(b.slotDuration)

head, err := b.blockState.BestBlockHeader()
err := b.invokeBlockAuthoring(epoch)
if err != nil {
logger.Error("failed to get best block header", "error", err)
return
logger.Crit("block authoring error", "error", err)
}
}

// if we're at genesis, set the first slot number for the network
if head.Number.Cmp(big.NewInt(0)) == 0 {
err = b.epochState.SetFirstSlot(startSlot)
func (b *Service) invokeBlockAuthoring(epoch uint64) error {
for {
// get start slot for current epoch
epochStart, err := b.epochState.GetStartSlotForEpoch(epoch)
if err != nil {
logger.Error("failed to set first slot number", "error", err)
return
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
return err
}
}

logger.Info("initiating epoch", "number", epoch, "start slot", startSlot+b.epochLength)
err = b.initiateEpoch(epoch)
if err != nil {
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
return
}
head, err := b.blockState.BestBlockHeader()
if err != nil {
logger.Error("failed to get best block header", "error", err)
return err
}

// get start slot for current epoch
epochStart, err := b.epochState.GetStartSlotForEpoch(0)
if err != nil {
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
return
}
// if we're at genesis, set the first slot number for the network
if head.Number.Cmp(big.NewInt(0)) == 0 {
epochStart = getCurrentSlot(b.slotDuration)
err = b.epochState.SetFirstSlot(epochStart)
if err != nil {
logger.Error("failed to set first slot number", "error", err)
return err
}
}

intoEpoch := startSlot - epochStart
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)
logger.Info("initiating epoch", "number", epoch, "first slot of epoch", epochStart)
err = b.initiateEpoch(epoch)
if err != nil {
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
return err
}

// if the calculated amount of slots "into the epoch" is greater than the epoch length,
// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
// resume it when ready
if b.epochLength <= intoEpoch && !b.dev {
b.paused = true
return
}
epochStartTime := getSlotStartTime(epochStart, b.slotDuration)
logger.Debug("checking if epoch started", "epoch start", epochStartTime, "now", time.Now())

// check if it's time to start the epoch yet. if not, wait until it is
if time.Since(epochStartTime) < 0 {
logger.Debug("waiting for epoch to start")
select {
case <-time.After(time.Until(epochStartTime)):
case <-b.ctx.Done():
return nil
case <-b.pause:
return nil
}
}

if b.dev {
intoEpoch = intoEpoch % b.epochLength
}
// calculate current slot
startSlot := getCurrentSlot(b.slotDuration)
intoEpoch := startSlot - epochStart

// if the calculated amount of slots "into the epoch" is greater than the epoch length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to move this into another function? This seems like something that happens infrequently and can be tested independently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would happen if the node is down for over an epoch or if it's starting up for the first time on a network that's past epoch 0, which part would you put in a separate function? just this if check?

// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
// resume it when ready
if b.epochLength <= intoEpoch && !b.dev {
logger.Debug("pausing BABE, need to sync", "slots into epoch", intoEpoch, "startSlot", startSlot, "epochStart", epochStart)
go func() {
<-b.pause
}()
return b.Pause()
}

slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
}
if b.dev {
intoEpoch = intoEpoch % b.epochLength
}

for i := 0; i < int(b.epochLength-intoEpoch); i++ {
select {
case <-b.ctx.Done():
return
case <-b.pause:
return
case <-slotDone[i]:
if !b.authority {
continue
}
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)

slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
}

slotNum := startSlot + uint64(i)
err = b.handleSlot(slotNum)
if err == ErrNotAuthorized {
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum)
continue
} else if err != nil {
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
continue
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
select {
case <-b.ctx.Done():
return nil
case <-b.pause:
return nil
case <-slotDone[i]:
if !b.authority {
continue
}

slotNum := startSlot + uint64(i)
err = b.handleSlot(slotNum)
if err == ErrNotAuthorized {
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum, "slots into epoch", i)
continue
} else if err != nil {
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
continue
}
}
}
}

// setup next epoch, re-invoke block authoring
next, err := b.incrementEpoch()
if err != nil {
logger.Error("failed to increment epoch", "error", err)
return
}
// setup next epoch, re-invoke block authoring
next, err := b.incrementEpoch()
if err != nil {
logger.Error("failed to increment epoch", "error", err)
return err
}

b.invokeBlockAuthoring(next)
logger.Info("epoch complete!", "completed epoch", epoch, "upcoming epoch", next)
epoch = next
}
}

func (b *Service) handleSlot(slotNum uint64) error {
Expand Down Expand Up @@ -466,8 +491,6 @@ func (b *Service) handleSlot(slotNum uint64) error {
number: slotNum,
}

logger.Debug("going to build block", "parent", parent)

// set runtime trie before building block
// if block building is successful, store the resulting trie in the storage state
ts, err := b.storageState.TrieState(&parent.StateRoot)
Expand Down Expand Up @@ -509,3 +532,7 @@ func (b *Service) handleSlot(slotNum uint64) error {
func getCurrentSlot(slotDuration time.Duration) uint64 {
return uint64(time.Now().UnixNano()) / uint64(slotDuration.Nanoseconds())
}

func getSlotStartTime(slot uint64, slotDuration time.Duration) time.Time {
return time.Unix(0, int64(slot)*slotDuration.Nanoseconds())
}
1 change: 1 addition & 0 deletions lib/babe/babe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func createTestService(t *testing.T, cfg *ServiceConfig) *Service {
cfg.Runtime = rt
}

cfg.LogLvl = defaultTestLogLvl
babeService, err := NewService(cfg)
require.NoError(t, err)
return babeService
Expand Down
2 changes: 1 addition & 1 deletion lib/babe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) {
}

func hasSlotEnded(slot Slot) bool {
slotEnd := slot.start.Add(slot.duration)
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of 1/3, Can we allocate a fixed time for block finalization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean exactly? 1/3 of the slot is a fixed slot duration, this is what substrate does afaik

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant 500ms or so. Instead of 1/3 of slot.duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no that would not be ideal, substrate uses 1/3 of the slot duration, you can see their implementation on the issue

return time.Since(slotEnd) >= 0
}

Expand Down
7 changes: 6 additions & 1 deletion lib/babe/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,15 @@ func (b *Service) initiateEpoch(epoch uint64) error {
delete(b.slotToProof, i-b.epochLength) // clear data from previous epoch
}

b.slotToProof[i], err = b.runLottery(i, epoch)
proof, err := b.runLottery(i, epoch)
if err != nil {
return fmt.Errorf("error running slot lottery at slot %d: error %s", i, err)
}

if proof != nil {
b.slotToProof[i] = proof
logger.Trace("claimed slot!", "slot", startSlot, "slots into epoch", i-startSlot)
}
}

return nil
Expand Down
25 changes: 15 additions & 10 deletions lib/babe/epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,35 @@ import (

func TestInitiateEpoch_Epoch0(t *testing.T) {
bs := createTestService(t, nil)
bs.epochLength = 5
bs.epochLength = 10
startSlot := uint64(1000)

err := bs.epochState.SetFirstSlot(startSlot)
require.NoError(t, err)
err = bs.initiateEpoch(0)
require.NoError(t, err)

count := 0
for i := startSlot; i < startSlot+bs.epochLength; i++ {
_, has := bs.slotToProof[i]
require.True(t, has)
if has {
count++
}
}
require.GreaterOrEqual(t, count, 1)
}

func TestInitiateEpoch(t *testing.T) {
func TestInitiateEpoch_Epoch1(t *testing.T) {
bs := createTestService(t, nil)
bs.epochLength = 5
bs.epochLength = 10

err := bs.initiateEpoch(0)
require.NoError(t, err)

state.AddBlocksToState(t, bs.blockState.(*state.BlockState), 1)

// epoch 1, check that genesis EpochData and ConfigData was properly set
threshold, err := CalculateThreshold(genesisBABEConfig.C1, genesisBABEConfig.C2, 1)
require.NoError(t, err)
threshold := bs.epochData.threshold

auth := &types.Authority{
Key: bs.keypair.Public().(*sr25519.PublicKey),
Expand All @@ -67,7 +73,7 @@ func TestInitiateEpoch(t *testing.T) {
threshold: threshold,
}
require.Equal(t, expected, bs.epochData)
require.Equal(t, int(bs.epochLength), len(bs.slotToProof))
require.GreaterOrEqual(t, len(bs.slotToProof), 1)

// for epoch 2, set EpochData but not ConfigData
edata := &types.EpochData{
Expand All @@ -89,7 +95,7 @@ func TestInitiateEpoch(t *testing.T) {
require.Equal(t, expected.randomness, bs.epochData.randomness)
require.Equal(t, expected.authorityIndex, bs.epochData.authorityIndex)
require.Equal(t, expected.threshold, bs.epochData.threshold)
require.Equal(t, int(bs.epochLength*2), len(bs.slotToProof))
require.GreaterOrEqual(t, len(bs.slotToProof), 1)

for i, auth := range bs.epochData.authorities {
expAuth, err := expected.authorities[i].Encode() //nolint
Expand Down Expand Up @@ -130,8 +136,7 @@ func TestInitiateEpoch(t *testing.T) {
require.Equal(t, expected, bs.epochData)

time.Sleep(time.Second)
// assert slot lottery was run for epochs 0, 1 and 2, 3
require.Equal(t, int(bs.epochLength*3), len(bs.slotToProof))
require.GreaterOrEqual(t, len(bs.slotToProof), 1)
}

func TestIncrementEpoch(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions lib/babe/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,12 @@ type epochData struct {
authorities []*types.Authority
threshold *common.Uint128
}

func (ed *epochData) String() string {
return fmt.Sprintf("randomness=%x authorityIndex=%d authorities=%v threshold=%s",
ed.randomness,
ed.authorityIndex,
ed.authorities,
ed.threshold,
)
}
5 changes: 5 additions & 0 deletions lib/common/uint128.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package common

import (
"encoding/binary"
"fmt"
"math/big"
)

Expand Down Expand Up @@ -66,6 +67,10 @@ func Uint128FromLEBytes(in []byte) *Uint128 {
}
}

func (u *Uint128) String() string {
return fmt.Sprintf("%d", big.NewInt(0).SetBytes(u.ToBEBytes()))
}

// ToLEBytes returns the Uint128 as a little endian byte slice
func (u *Uint128) ToLEBytes() []byte {
buf := make([]byte, 16)
Expand Down