Skip to content

Commit

Permalink
fix(lib/babe): fix timing for transition between epochs (#1636)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot committed Jun 15, 2021
1 parent 3c18e47 commit 57027db
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 105 deletions.
1 change: 0 additions & 1 deletion go.mod
Expand Up @@ -38,7 +38,6 @@ require (
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.11.1
github.com/libp2p/go-libp2p-kbucket v0.4.7
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-sockaddr v0.1.0 // indirect
Expand Down
210 changes: 118 additions & 92 deletions lib/babe/babe.go
Expand Up @@ -38,7 +38,6 @@ var logger log.Logger
type Service struct {
ctx context.Context
cancel context.CancelFunc
paused bool
authority bool
dev bool

Expand Down Expand Up @@ -123,7 +122,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) {
rt: cfg.Runtime,
transactionState: cfg.TransactionState,
slotToProof: make(map[uint64]*VrfOutputAndProof),
blockChan: make(chan types.Block),
blockChan: make(chan types.Block, 16),
pause: make(chan struct{}),
authority: cfg.Authority,
dev: cfg.IsDev,
Expand All @@ -146,7 +145,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) {
"epoch length (slots)", babeService.epochLength,
"authorities", Authorities(babeService.epochData.authorities),
"authority index", babeService.epochData.authorityIndex,
"threshold", babeService.epochData.threshold.ToLEBytes(),
"threshold", babeService.epochData.threshold,
"randomness", babeService.epochData.randomness,
)
return babeService, nil
Expand Down Expand Up @@ -226,39 +225,49 @@ func (b *Service) EpochLength() uint64 {

// Pause pauses the service ie. halts block production
func (b *Service) Pause() error {
if b.paused {
return errors.New("service already paused")
}

b.Lock()
defer b.Unlock()

b.pause <- struct{}{}
b.paused = true
if b.IsPaused() {
return nil
}

close(b.pause)
return nil
}

// Resume resumes the service ie. resumes block production
func (b *Service) Resume() error {
if !b.paused {
b.Lock()
defer b.Unlock()

if !b.IsPaused() {
return nil
}

b.pause = make(chan struct{})

epoch, err := b.epochState.GetCurrentEpoch()
if err != nil {
logger.Error("failed to get current epoch", "error", err)
return err
}

b.Lock()
defer b.Unlock()

b.paused = false
go b.initiate(epoch)
logger.Info("service resumed", "epoch", epoch)
return nil
}

// IsPaused returns if the service is paused or not (ie. producing blocks)
func (b *Service) IsPaused() bool {
select {
case <-b.pause:
return true
default:
return false
}
}

// Stop stops the service. If stop is called, it cannot be resumed.
func (b *Service) Stop() error {
b.Lock()
Expand Down Expand Up @@ -301,13 +310,6 @@ func (b *Service) IsStopped() bool {
return b.ctx.Err() != nil
}

// IsPaused returns if the service is paused or not (ie. producing blocks)
func (b *Service) IsPaused() bool {
b.RLock()
defer b.RUnlock()
return b.paused
}

func (b *Service) safeSend(msg types.Block) error {
b.Lock()
defer b.Unlock()
Expand Down Expand Up @@ -351,93 +353,115 @@ func (b *Service) initiate(epoch uint64) {
return
}

b.invokeBlockAuthoring(epoch)
err := b.invokeBlockAuthoring(epoch)
if err != nil {
logger.Crit("block authoring error", "error", err)
}
}

func (b *Service) invokeBlockAuthoring(epoch uint64) {
// calculate current slot
startSlot := getCurrentSlot(b.slotDuration)
func (b *Service) invokeBlockAuthoring(epoch uint64) error {
for {
// get start slot for current epoch
epochStart, err := b.epochState.GetStartSlotForEpoch(epoch)
if err != nil {
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
return err
}

head, err := b.blockState.BestBlockHeader()
if err != nil {
logger.Error("failed to get best block header", "error", err)
return
}
head, err := b.blockState.BestBlockHeader()
if err != nil {
logger.Error("failed to get best block header", "error", err)
return err
}

// if we're at genesis, set the first slot number for the network
if head.Number.Cmp(big.NewInt(0)) == 0 {
err = b.epochState.SetFirstSlot(startSlot)
// if we're at genesis, set the first slot number for the network
if head.Number.Cmp(big.NewInt(0)) == 0 {
epochStart = getCurrentSlot(b.slotDuration)
err = b.epochState.SetFirstSlot(epochStart)
if err != nil {
logger.Error("failed to set first slot number", "error", err)
return err
}
}

logger.Info("initiating epoch", "number", epoch, "first slot of epoch", epochStart)
err = b.initiateEpoch(epoch)
if err != nil {
logger.Error("failed to set first slot number", "error", err)
return
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
return err
}
}

logger.Info("initiating epoch", "number", epoch, "start slot", startSlot+b.epochLength)
err = b.initiateEpoch(epoch)
if err != nil {
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
return
}
epochStartTime := getSlotStartTime(epochStart, b.slotDuration)
logger.Debug("checking if epoch started", "epoch start", epochStartTime, "now", time.Now())

// check if it's time to start the epoch yet. if not, wait until it is
if time.Since(epochStartTime) < 0 {
logger.Debug("waiting for epoch to start")
select {
case <-time.After(time.Until(epochStartTime)):
case <-b.ctx.Done():
return nil
case <-b.pause:
return nil
}
}

// get start slot for current epoch
epochStart, err := b.epochState.GetStartSlotForEpoch(0)
if err != nil {
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
return
}
// calculate current slot
startSlot := getCurrentSlot(b.slotDuration)
intoEpoch := startSlot - epochStart

intoEpoch := startSlot - epochStart
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)
// if the calculated amount of slots "into the epoch" is greater than the epoch length,
// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
// resume it when ready
if b.epochLength <= intoEpoch && !b.dev {
logger.Debug("pausing BABE, need to sync", "slots into epoch", intoEpoch, "startSlot", startSlot, "epochStart", epochStart)
return b.Pause()
}

// if the calculated amount of slots "into the epoch" is greater than the epoch length,
// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
// resume it when ready
if b.epochLength <= intoEpoch && !b.dev {
b.paused = true
return
}
if b.dev {
intoEpoch = intoEpoch % b.epochLength
}

if b.dev {
intoEpoch = intoEpoch % b.epochLength
}
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)

slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
}
slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
}

for i := 0; i < int(b.epochLength-intoEpoch); i++ {
select {
case <-b.ctx.Done():
return
case <-b.pause:
return
case <-slotDone[i]:
if !b.authority {
continue
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
select {
case <-b.ctx.Done():
return nil
case <-b.pause:
return nil
case <-slotDone[i]:
if !b.authority {
continue
}

slotNum := startSlot + uint64(i)
err = b.handleSlot(slotNum)
if err == ErrNotAuthorized {
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum, "slots into epoch", i)
continue
} else if err != nil {
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
continue
}
}
}

slotNum := startSlot + uint64(i)
err = b.handleSlot(slotNum)
if err == ErrNotAuthorized {
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum)
continue
} else if err != nil {
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
continue
}
// setup next epoch, re-invoke block authoring
next, err := b.incrementEpoch()
if err != nil {
logger.Error("failed to increment epoch", "error", err)
return err
}
}

// setup next epoch, re-invoke block authoring
next, err := b.incrementEpoch()
if err != nil {
logger.Error("failed to increment epoch", "error", err)
return
logger.Info("epoch complete!", "completed epoch", epoch, "upcoming epoch", next)
epoch = next
}

b.invokeBlockAuthoring(next)
}

func (b *Service) handleSlot(slotNum uint64) error {
Expand Down Expand Up @@ -466,8 +490,6 @@ func (b *Service) handleSlot(slotNum uint64) error {
number: slotNum,
}

logger.Debug("going to build block", "parent", parent)

// set runtime trie before building block
// if block building is successful, store the resulting trie in the storage state
ts, err := b.storageState.TrieState(&parent.StateRoot)
Expand Down Expand Up @@ -509,3 +531,7 @@ func (b *Service) handleSlot(slotNum uint64) error {
func getCurrentSlot(slotDuration time.Duration) uint64 {
return uint64(time.Now().UnixNano()) / uint64(slotDuration.Nanoseconds())
}

func getSlotStartTime(slot uint64, slotDuration time.Duration) time.Time {
return time.Unix(0, int64(slot)*slotDuration.Nanoseconds())
}
31 changes: 31 additions & 0 deletions lib/babe/babe_test.go
Expand Up @@ -136,6 +136,7 @@ func createTestService(t *testing.T, cfg *ServiceConfig) *Service {
cfg.Runtime = rt
}

cfg.LogLvl = defaultTestLogLvl
babeService, err := NewService(cfg)
require.NoError(t, err)
return babeService
Expand Down Expand Up @@ -252,3 +253,33 @@ func TestStartAndStop(t *testing.T) {
err = bs.Stop()
require.NoError(t, err)
}

func TestService_PauseAndResume(t *testing.T) {
bs := createTestService(t, &ServiceConfig{
LogLvl: log.LvlCrit,
})
err := bs.Start()
require.NoError(t, err)
time.Sleep(time.Second)

go func() {
_ = bs.Pause()
}()

go func() {
_ = bs.Pause()
}()

go func() {
err := bs.Resume() //nolint
require.NoError(t, err)
}()

go func() {
err := bs.Resume() //nolint
require.NoError(t, err)
}()

err = bs.Stop()
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion lib/babe/build.go
Expand Up @@ -322,7 +322,7 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) {
}

func hasSlotEnded(slot Slot) bool {
slotEnd := slot.start.Add(slot.duration)
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
return time.Since(slotEnd) >= 0
}

Expand Down
7 changes: 6 additions & 1 deletion lib/babe/epoch.go
Expand Up @@ -117,10 +117,15 @@ func (b *Service) initiateEpoch(epoch uint64) error {
delete(b.slotToProof, i-b.epochLength) // clear data from previous epoch
}

b.slotToProof[i], err = b.runLottery(i, epoch)
proof, err := b.runLottery(i, epoch)
if err != nil {
return fmt.Errorf("error running slot lottery at slot %d: error %s", i, err)
}

if proof != nil {
b.slotToProof[i] = proof
logger.Trace("claimed slot!", "slot", startSlot, "slots into epoch", i-startSlot)
}
}

return nil
Expand Down

0 comments on commit 57027db

Please sign in to comment.