From 0376f0ebab762363d0ca7579f8605661a5a18731 Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Thu, 14 May 2020 14:52:16 +0200 Subject: [PATCH] Fix inventory page duplication --- blockchain/skipchain/db.go | 3 +++ blockchain/skipchain/ops.go | 22 +++++++++++++--------- blockchain/skipchain/ops_test.go | 8 ++++---- consensus/cosipbft/mod.go | 2 ++ consensus/cosipbft/mod_test.go | 10 +++++++++- consensus/cosipbft/queue.go | 8 ++++++++ consensus/cosipbft/queue_test.go | 11 +++++++++++ ledger/inventory/mem/mod.go | 8 ++++++++ ledger/inventory/mem/mod_test.go | 18 ++++++++++-------- mino/addr_test.go | 9 +++++++++ 10 files changed, 77 insertions(+), 22 deletions(-) diff --git a/blockchain/skipchain/db.go b/blockchain/skipchain/db.go index fda62a0c9..faf65b5b5 100644 --- a/blockchain/skipchain/db.go +++ b/blockchain/skipchain/db.go @@ -82,6 +82,9 @@ func (db *InMemoryDatabase) Write(block SkipBlock) error { // Contains implements skipchain.Database. It returns true if the block is // stored in the database, otherwise false. func (db *InMemoryDatabase) Contains(index uint64) bool { + db.Lock() + defer db.Unlock() + return index < uint64(len(db.blocks)) } diff --git a/blockchain/skipchain/ops.go b/blockchain/skipchain/ops.go index e07c9d5a1..102054f4d 100644 --- a/blockchain/skipchain/ops.go +++ b/blockchain/skipchain/ops.go @@ -15,7 +15,7 @@ import ( "golang.org/x/xerrors" ) -const catchUpLeeway = 200 * time.Millisecond +const catchUpLeeway = 100 * time.Millisecond // operations implements helper functions that can be used by the handlers for // common operations. @@ -95,7 +95,7 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error { from = last.GetIndex() + 1 } - if target.GetIndex()-from <= 2 { + if target.GetIndex()-from <= 1 { // When only one block is missing, that probably means the propagation // is not yet over, so it gives a chance to wait for it before starting // the actual catch up. @@ -191,12 +191,16 @@ func (ops *operations) waitBlock(index uint64) { ops.watcher.Add(observer) defer ops.watcher.Remove(observer) - select { - case <-observer.ch: - return - case <-time.After(catchUpLeeway): - // Even if the commit message could arrive later, the catch up procedure - // starts anyway. - return + for { + select { + case block := <-observer.ch: + if block.GetIndex() == index { + return + } + case <-time.After(catchUpLeeway): + // Even if the commit message could arrive later, the catch up procedure + // starts anyway. + return + } } } diff --git a/blockchain/skipchain/ops_test.go b/blockchain/skipchain/ops_test.go index 789982b41..1be83d12d 100644 --- a/blockchain/skipchain/ops_test.go +++ b/blockchain/skipchain/ops_test.go @@ -81,19 +81,19 @@ func TestOperations_CatchUp(t *testing.T) { ops.db = &fakeDatabase{blocks: []SkipBlock{{}, {}, {}}} ops.catchUpLock.Unlock() }() - err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0)) + err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0)) require.NoError(t, err) require.Equal(t, 2, call.Len()) // Catch up with only one block missing but it arrives during the catch up. ops.db = &fakeDatabase{blocks: []SkipBlock{{}}} - ops.watcher = &fakeWatcher{call: call, block: SkipBlock{}} - err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0)) + ops.watcher = &fakeWatcher{call: call, block: SkipBlock{Index: 1}} + err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0)) require.NoError(t, err) require.Equal(t, 4, call.Len()) ops.db = &fakeDatabase{blocks: []SkipBlock{{}}, err: xerrors.New("oops")} - err = ops.catchUp(SkipBlock{Index: 3}, nil) + err = ops.catchUp(SkipBlock{Index: 2}, nil) require.EqualError(t, err, "couldn't read last block: oops") ops.db = &fakeDatabase{blocks: []SkipBlock{{}}} diff --git a/consensus/cosipbft/mod.go b/consensus/cosipbft/mod.go index aa4aae235..b61a79907 100644 --- a/consensus/cosipbft/mod.go +++ b/consensus/cosipbft/mod.go @@ -139,6 +139,8 @@ func (c *Consensus) Store(in consensus.Chain) error { if err != nil { return xerrors.Errorf("couldn't store link: %v", err) } + + c.queue.Clear() } } diff --git a/consensus/cosipbft/mod_test.go b/consensus/cosipbft/mod_test.go index ccd2b6797..515545099 100644 --- a/consensus/cosipbft/mod_test.go +++ b/consensus/cosipbft/mod_test.go @@ -176,9 +176,11 @@ func TestConsensus_Listen(t *testing.T) { } func TestConsensus_Store(t *testing.T) { + call := &fake.Call{} cons := &Consensus{ encoder: encoding.NewProtoEncoder(), storage: newInMemoryStorage(), + queue: fakeQueue{call: call}, } links := []forwardLink{ @@ -188,6 +190,7 @@ func TestConsensus_Store(t *testing.T) { err := cons.Store(forwardLinkChain{links: links}) require.NoError(t, err) + require.Equal(t, 2, call.Len()) err = cons.Store(fakeChain{}) require.EqualError(t, err, @@ -582,7 +585,12 @@ func (gov fakeGovernance) GetChangeSet(consensus.Proposal) (viewchange.ChangeSet type fakeQueue struct { Queue - err error + err error + call *fake.Call +} + +func (q fakeQueue) Clear() { + q.call.Add("clear") } func (q fakeQueue) Finalize(to Digest, commit crypto.Signature) (*ForwardLinkProto, error) { diff --git a/consensus/cosipbft/queue.go b/consensus/cosipbft/queue.go index 8d20820f3..2a8d9b9f9 100644 --- a/consensus/cosipbft/queue.go +++ b/consensus/cosipbft/queue.go @@ -16,6 +16,7 @@ type Queue interface { New(fl forwardLink, authority crypto.CollectiveAuthority) error LockProposal(to Digest, sig crypto.Signature) error Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, error) + Clear() } type item struct { @@ -148,3 +149,10 @@ func (q *queue) Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, er return packed.(*ForwardLinkProto), nil } + +func (q *queue) Clear() { + q.Lock() + q.locked = false + q.items = nil + q.Unlock() +} diff --git a/consensus/cosipbft/queue_test.go b/consensus/cosipbft/queue_test.go index 6e91e3448..1bab107cf 100644 --- a/consensus/cosipbft/queue_test.go +++ b/consensus/cosipbft/queue_test.go @@ -144,6 +144,17 @@ func TestQueue_Finalize(t *testing.T) { require.EqualError(t, err, "couldn't pack forward link: fake error") } +func TestQueue_Clear(t *testing.T) { + queue := &queue{ + locked: true, + items: []item{{}, {}}, + } + + queue.Clear() + require.False(t, queue.locked) + require.Empty(t, queue.items) +} + // ----------------------------------------------------------------------------- // Utility functions diff --git a/ledger/inventory/mem/mod.go b/ledger/inventory/mem/mod.go index 6ac8b32cc..c03daca13 100644 --- a/ledger/inventory/mem/mod.go +++ b/ledger/inventory/mem/mod.go @@ -2,6 +2,7 @@ package mem import ( "bytes" + "encoding/binary" "fmt" "sort" "sync" @@ -134,6 +135,13 @@ func (inv *InMemoryInventory) Stage(f func(inventory.WritablePage) error) (inven func (inv *InMemoryInventory) computeHash(page *inMemoryPage) error { h := inv.hashFactory.New() + buffer := make([]byte, 8) + binary.LittleEndian.PutUint64(buffer, page.index) + _, err := h.Write(buffer) + if err != nil { + return xerrors.Errorf("couldn't write index: %v", err) + } + keys := make(DigestSlice, 0, len(page.entries)) for key := range page.entries { keys = append(keys, key) diff --git a/ledger/inventory/mem/mod_test.go b/ledger/inventory/mem/mod_test.go index 0c850384e..4e71afa1b 100644 --- a/ledger/inventory/mem/mod_test.go +++ b/ledger/inventory/mem/mod_test.go @@ -52,6 +52,7 @@ func TestInMemoryInventory_Stage(t *testing.T) { inv.pages = append(inv.pages, inv.stagingPages[page.(*inMemoryPage).fingerprint]) inv.stagingPages = make(map[Digest]*inMemoryPage) page, err = inv.Stage(func(page inventory.WritablePage) error { + page.Defer(func([]byte) {}) value, err := page.Read([]byte{1}) require.NoError(t, err) require.True(t, value.(*wrappers.BoolValue).Value) @@ -76,17 +77,18 @@ func TestInMemoryInventory_Stage(t *testing.T) { require.EqualError(t, err, "couldn't fill new page: oops") inv.hashFactory = fake.NewHashFactory(fake.NewBadHash()) - _, err = inv.Stage(func(inventory.WritablePage) error { - return nil - }) - require.Error(t, err) - require.Contains(t, err.Error(), "couldn't compute page hash: ") + _, err = inv.Stage(func(inventory.WritablePage) error { return nil }) + require.EqualError(t, err, + "couldn't compute page hash: couldn't write index: fake error") + + inv.hashFactory = fake.NewHashFactory(fake.NewBadHashWithDelay(1)) + _, err = inv.Stage(func(inventory.WritablePage) error { return nil }) + require.EqualError(t, err, + "couldn't compute page hash: couldn't write key: fake error") inv.hashFactory = fake.NewHashFactory(&fake.Hash{}) inv.encoder = fake.BadMarshalStableEncoder{} - _, err = inv.Stage(func(inventory.WritablePage) error { - return nil - }) + _, err = inv.Stage(func(inventory.WritablePage) error { return nil }) require.EqualError(t, err, "couldn't compute page hash: couldn't marshal entry: fake error") } diff --git a/mino/addr_test.go b/mino/addr_test.go index 6ed65b8aa..0dc6807ff 100644 --- a/mino/addr_test.go +++ b/mino/addr_test.go @@ -6,6 +6,15 @@ import ( "github.com/stretchr/testify/require" ) +func TestAddressIterator_Seek(t *testing.T) { + addrs := []Address{fakeAddr{}, fakeAddr{}} + + iter := addressIterator{addrs: addrs} + iter.Seek(1) + require.NotNil(t, iter.GetNext()) + require.Nil(t, iter.GetNext()) +} + func TestAddressIterator_HasNext(t *testing.T) { addrs := []Address{fakeAddr{}, fakeAddr{}}