Skip to content

Commit

Permalink
Fix catch up synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed May 14, 2020
1 parent 929b0bb commit ec31980
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
19 changes: 13 additions & 6 deletions blockchain/skipchain/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"golang.org/x/xerrors"
)

const catchUpLeeway = 100 * time.Millisecond
const catchUpLeeway = 200 * time.Millisecond

// operations implements helper functions that can be used by the handlers for
// common operations.
Expand Down Expand Up @@ -95,10 +95,17 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error {
from = last.GetIndex() + 1
}

if target.GetIndex()-from <= 2 && ops.waitBlock(target.GetIndex()-1) {
if target.GetIndex()-from <= 2 {
// When only one block is missing, that probably means the propagation
// is not yet over, so it gives a chance to wait for it before starting
// the actual catch up.
ops.waitBlock(target.GetIndex() - 1)

// Check again after the lock is acquired again.
if ops.db.Contains(target.GetIndex() - 1) {
return nil
}

return nil
}

Expand Down Expand Up @@ -175,7 +182,7 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error {
// waitBlock releases the catch up lock and wait for new blocks to be committed.
// It will return true if the expected block index exists before the timeout.
// Note: it expects the lock to be acquired and released later.
func (ops *operations) waitBlock(index uint64) bool {
func (ops *operations) waitBlock(index uint64) {
ops.catchUpLock.Unlock()
defer ops.catchUpLock.Lock()

Expand All @@ -187,11 +194,11 @@ func (ops *operations) waitBlock(index uint64) bool {
defer ops.watcher.Remove(observer)

select {
case block := <-observer.ch:
return block.GetIndex() == index
case <-observer.ch:
return
case <-time.After(catchUpLeeway):
// Even if the commit message could arrive later, the catch up procedure
// starts anyway.
return false
return
}
}
7 changes: 7 additions & 0 deletions ledger/byzcoin/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package byzcoin

import (
"context"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -51,6 +52,7 @@ type Ledger struct {
encoder encoding.ProtoMarshaler
txFactory transactions.TransactionFactory
closing chan struct{}
closed sync.WaitGroup
initiated chan error
}

Expand Down Expand Up @@ -126,6 +128,7 @@ func (ldgr *Ledger) Listen() (ledger.Actor, error) {
return
}

ldgr.closed.Add(2)
close(ldgr.initiated)

go ldgr.gossipTxs()
Expand All @@ -136,6 +139,8 @@ func (ldgr *Ledger) Listen() (ledger.Actor, error) {
}

func (ldgr *Ledger) gossipTxs() {
defer ldgr.closed.Done()

for {
select {
case <-ldgr.closing:
Expand All @@ -155,6 +160,8 @@ func (ldgr *Ledger) gossipTxs() {
}

func (ldgr *Ledger) proposeBlocks(actor blockchain.Actor, players mino.Players) {
defer ldgr.closed.Done()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down
22 changes: 13 additions & 9 deletions ledger/byzcoin/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -113,38 +112,39 @@ func TestLedger_Listen(t *testing.T) {
}

actor, err := ledger.Listen()
<-ledger.initiated
require.NoError(t, err)
require.NotNil(t, actor)
require.NoError(t, actor.Close())
waitClose(ledger)

ledger.bc = fakeBlockchain{errListen: xerrors.New("oops")}
_, err = ledger.Listen()
require.EqualError(t, err, "couldn't start the blockchain: oops")
waitClose(ledger)

blocks := make(chan blockchain.Block, 1)
blocks <- fakeBlock{index: 1}
ledger.bc = fakeBlockchain{blocks: blocks, errBlock: xerrors.New("oops")}
ledger.initiated = make(chan error, 1)
_, err = ledger.Listen()
require.NoError(t, err)
err = <-ledger.initiated
err = waitClose(ledger)
require.EqualError(t, err, "expect genesis but got block 1")

ledger.bc = fakeBlockchain{}
ledger.governance = fakeGovernance{err: xerrors.New("oops")}
ledger.initiated = make(chan error, 1)
_, err = ledger.Listen()
require.NoError(t, err)
err = <-ledger.initiated
err = waitClose(ledger)
require.EqualError(t, err, "couldn't read chain roster: oops")

ledger.governance = fakeGovernance{}
ledger.gossiper = fakeGossiper{err: xerrors.New("oops")}
ledger.initiated = make(chan error, 1)
_, err = ledger.Listen()
require.NoError(t, err)
err = <-ledger.initiated
err = waitClose(ledger)
require.EqualError(t, err, "couldn't start the gossiper: oops")
}

Expand All @@ -157,11 +157,9 @@ func TestLedger_GossipTxs(t *testing.T) {
gossiper: fakeGossiper{rumors: rumors, err: xerrors.New("oops")},
}

wg := sync.WaitGroup{}
wg.Add(1)
ledger.closed.Add(1)
go func() {
ledger.gossipTxs()
wg.Done()
}()

rumors <- fakeTx{id: []byte{0x01}}
Expand All @@ -172,7 +170,7 @@ func TestLedger_GossipTxs(t *testing.T) {
require.Len(t, ledger.bag.GetAll(), 2)

close(ledger.closing)
wg.Wait()
ledger.closed.Wait()
}

func TestActor_Setup(t *testing.T) {
Expand Down Expand Up @@ -258,6 +256,12 @@ func makeDarc(t *testing.T, signer crypto.Signer) darc.Access {
return access
}

func waitClose(ledger *Ledger) error {
err := <-ledger.initiated
ledger.closed.Wait()
return err
}

func sendTx(t *testing.T, ledger ledger.Ledger, actor ledger.Actor, tx transactions.ClientTransaction) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit ec31980

Please sign in to comment.