diff --git a/blockchain/skipchain/db.go b/blockchain/skipchain/db.go index fda62a0c9..faf65b5b5 100644 --- a/blockchain/skipchain/db.go +++ b/blockchain/skipchain/db.go @@ -82,6 +82,9 @@ func (db *InMemoryDatabase) Write(block SkipBlock) error { // Contains implements skipchain.Database. It returns true if the block is // stored in the database, otherwise false. func (db *InMemoryDatabase) Contains(index uint64) bool { + db.Lock() + defer db.Unlock() + return index < uint64(len(db.blocks)) } diff --git a/blockchain/skipchain/ops.go b/blockchain/skipchain/ops.go index e07c9d5a1..102054f4d 100644 --- a/blockchain/skipchain/ops.go +++ b/blockchain/skipchain/ops.go @@ -15,7 +15,7 @@ import ( "golang.org/x/xerrors" ) -const catchUpLeeway = 200 * time.Millisecond +const catchUpLeeway = 100 * time.Millisecond // operations implements helper functions that can be used by the handlers for // common operations. @@ -95,7 +95,7 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error { from = last.GetIndex() + 1 } - if target.GetIndex()-from <= 2 { + if target.GetIndex()-from <= 1 { // When only one block is missing, that probably means the propagation // is not yet over, so it gives a chance to wait for it before starting // the actual catch up. @@ -191,12 +191,16 @@ func (ops *operations) waitBlock(index uint64) { ops.watcher.Add(observer) defer ops.watcher.Remove(observer) - select { - case <-observer.ch: - return - case <-time.After(catchUpLeeway): - // Even if the commit message could arrive later, the catch up procedure - // starts anyway. - return + for { + select { + case block := <-observer.ch: + if block.GetIndex() == index { + return + } + case <-time.After(catchUpLeeway): + // Even if the commit message could arrive later, the catch up procedure + // starts anyway. + return + } } } diff --git a/blockchain/skipchain/ops_test.go b/blockchain/skipchain/ops_test.go index 789982b41..b81102da4 100644 --- a/blockchain/skipchain/ops_test.go +++ b/blockchain/skipchain/ops_test.go @@ -81,19 +81,19 @@ func TestOperations_CatchUp(t *testing.T) { ops.db = &fakeDatabase{blocks: []SkipBlock{{}, {}, {}}} ops.catchUpLock.Unlock() }() - err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0)) + err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0)) require.NoError(t, err) require.Equal(t, 2, call.Len()) // Catch up with only one block missing but it arrives during the catch up. ops.db = &fakeDatabase{blocks: []SkipBlock{{}}} ops.watcher = &fakeWatcher{call: call, block: SkipBlock{}} - err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0)) + err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0)) require.NoError(t, err) require.Equal(t, 4, call.Len()) ops.db = &fakeDatabase{blocks: []SkipBlock{{}}, err: xerrors.New("oops")} - err = ops.catchUp(SkipBlock{Index: 3}, nil) + err = ops.catchUp(SkipBlock{Index: 2}, nil) require.EqualError(t, err, "couldn't read last block: oops") ops.db = &fakeDatabase{blocks: []SkipBlock{{}}} diff --git a/consensus/cosipbft/mod.go b/consensus/cosipbft/mod.go index aa4aae235..b61a79907 100644 --- a/consensus/cosipbft/mod.go +++ b/consensus/cosipbft/mod.go @@ -139,6 +139,8 @@ func (c *Consensus) Store(in consensus.Chain) error { if err != nil { return xerrors.Errorf("couldn't store link: %v", err) } + + c.queue.Clear() } } diff --git a/consensus/cosipbft/mod_test.go b/consensus/cosipbft/mod_test.go index ccd2b6797..f3ba09079 100644 --- a/consensus/cosipbft/mod_test.go +++ b/consensus/cosipbft/mod_test.go @@ -179,6 +179,7 @@ func TestConsensus_Store(t *testing.T) { cons := &Consensus{ encoder: encoding.NewProtoEncoder(), storage: newInMemoryStorage(), + queue: fakeQueue{}, } links := []forwardLink{ @@ -585,6 +586,10 @@ type fakeQueue struct { err error } +func (q fakeQueue) Clear() { + +} + func (q fakeQueue) Finalize(to Digest, commit crypto.Signature) (*ForwardLinkProto, error) { return &ForwardLinkProto{}, q.err } diff --git a/consensus/cosipbft/queue.go b/consensus/cosipbft/queue.go index 8d20820f3..2a8d9b9f9 100644 --- a/consensus/cosipbft/queue.go +++ b/consensus/cosipbft/queue.go @@ -16,6 +16,7 @@ type Queue interface { New(fl forwardLink, authority crypto.CollectiveAuthority) error LockProposal(to Digest, sig crypto.Signature) error Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, error) + Clear() } type item struct { @@ -148,3 +149,10 @@ func (q *queue) Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, er return packed.(*ForwardLinkProto), nil } + +func (q *queue) Clear() { + q.Lock() + q.locked = false + q.items = nil + q.Unlock() +} diff --git a/ledger/inventory/mem/mod.go b/ledger/inventory/mem/mod.go index 6ac8b32cc..5ea7c53bf 100644 --- a/ledger/inventory/mem/mod.go +++ b/ledger/inventory/mem/mod.go @@ -2,6 +2,7 @@ package mem import ( "bytes" + "encoding/binary" "fmt" "sort" "sync" @@ -134,6 +135,10 @@ func (inv *InMemoryInventory) Stage(f func(inventory.WritablePage) error) (inven func (inv *InMemoryInventory) computeHash(page *inMemoryPage) error { h := inv.hashFactory.New() + buffer := make([]byte, 8) + binary.LittleEndian.PutUint64(buffer, page.index) + h.Write(buffer) + keys := make(DigestSlice, 0, len(page.entries)) for key := range page.entries { keys = append(keys, key)