Skip to content

Commit

Permalink
Fix inventory page duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed May 14, 2020
1 parent 4da8ec4 commit 0376f0e
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 22 deletions.
3 changes: 3 additions & 0 deletions blockchain/skipchain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (db *InMemoryDatabase) Write(block SkipBlock) error {
// Contains implements skipchain.Database. It returns true if the block is
// stored in the database, otherwise false.
func (db *InMemoryDatabase) Contains(index uint64) bool {
db.Lock()
defer db.Unlock()

return index < uint64(len(db.blocks))
}

Expand Down
22 changes: 13 additions & 9 deletions blockchain/skipchain/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"golang.org/x/xerrors"
)

const catchUpLeeway = 200 * time.Millisecond
const catchUpLeeway = 100 * time.Millisecond

// operations implements helper functions that can be used by the handlers for
// common operations.
Expand Down Expand Up @@ -95,7 +95,7 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error {
from = last.GetIndex() + 1
}

if target.GetIndex()-from <= 2 {
if target.GetIndex()-from <= 1 {
// When only one block is missing, that probably means the propagation
// is not yet over, so it gives a chance to wait for it before starting
// the actual catch up.
Expand Down Expand Up @@ -191,12 +191,16 @@ func (ops *operations) waitBlock(index uint64) {
ops.watcher.Add(observer)
defer ops.watcher.Remove(observer)

select {
case <-observer.ch:
return
case <-time.After(catchUpLeeway):
// Even if the commit message could arrive later, the catch up procedure
// starts anyway.
return
for {
select {
case block := <-observer.ch:
if block.GetIndex() == index {
return
}
case <-time.After(catchUpLeeway):
// Even if the commit message could arrive later, the catch up procedure
// starts anyway.
return
}
}
}
8 changes: 4 additions & 4 deletions blockchain/skipchain/ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,19 @@ func TestOperations_CatchUp(t *testing.T) {
ops.db = &fakeDatabase{blocks: []SkipBlock{{}, {}, {}}}
ops.catchUpLock.Unlock()
}()
err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0))
err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0))
require.NoError(t, err)
require.Equal(t, 2, call.Len())

// Catch up with only one block missing but it arrives during the catch up.
ops.db = &fakeDatabase{blocks: []SkipBlock{{}}}
ops.watcher = &fakeWatcher{call: call, block: SkipBlock{}}
err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0))
ops.watcher = &fakeWatcher{call: call, block: SkipBlock{Index: 1}}
err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0))
require.NoError(t, err)
require.Equal(t, 4, call.Len())

ops.db = &fakeDatabase{blocks: []SkipBlock{{}}, err: xerrors.New("oops")}
err = ops.catchUp(SkipBlock{Index: 3}, nil)
err = ops.catchUp(SkipBlock{Index: 2}, nil)
require.EqualError(t, err, "couldn't read last block: oops")

ops.db = &fakeDatabase{blocks: []SkipBlock{{}}}
Expand Down
2 changes: 2 additions & 0 deletions consensus/cosipbft/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func (c *Consensus) Store(in consensus.Chain) error {
if err != nil {
return xerrors.Errorf("couldn't store link: %v", err)
}

c.queue.Clear()
}
}

Expand Down
10 changes: 9 additions & 1 deletion consensus/cosipbft/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,11 @@ func TestConsensus_Listen(t *testing.T) {
}

func TestConsensus_Store(t *testing.T) {
call := &fake.Call{}
cons := &Consensus{
encoder: encoding.NewProtoEncoder(),
storage: newInMemoryStorage(),
queue: fakeQueue{call: call},
}

links := []forwardLink{
Expand All @@ -188,6 +190,7 @@ func TestConsensus_Store(t *testing.T) {

err := cons.Store(forwardLinkChain{links: links})
require.NoError(t, err)
require.Equal(t, 2, call.Len())

err = cons.Store(fakeChain{})
require.EqualError(t, err,
Expand Down Expand Up @@ -582,7 +585,12 @@ func (gov fakeGovernance) GetChangeSet(consensus.Proposal) (viewchange.ChangeSet

type fakeQueue struct {
Queue
err error
err error
call *fake.Call
}

func (q fakeQueue) Clear() {
q.call.Add("clear")
}

func (q fakeQueue) Finalize(to Digest, commit crypto.Signature) (*ForwardLinkProto, error) {
Expand Down
8 changes: 8 additions & 0 deletions consensus/cosipbft/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Queue interface {
New(fl forwardLink, authority crypto.CollectiveAuthority) error
LockProposal(to Digest, sig crypto.Signature) error
Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, error)
Clear()
}

type item struct {
Expand Down Expand Up @@ -148,3 +149,10 @@ func (q *queue) Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, er

return packed.(*ForwardLinkProto), nil
}

func (q *queue) Clear() {
q.Lock()
q.locked = false
q.items = nil
q.Unlock()
}
11 changes: 11 additions & 0 deletions consensus/cosipbft/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ func TestQueue_Finalize(t *testing.T) {
require.EqualError(t, err, "couldn't pack forward link: fake error")
}

func TestQueue_Clear(t *testing.T) {
queue := &queue{
locked: true,
items: []item{{}, {}},
}

queue.Clear()
require.False(t, queue.locked)
require.Empty(t, queue.items)
}

// -----------------------------------------------------------------------------
// Utility functions

Expand Down
8 changes: 8 additions & 0 deletions ledger/inventory/mem/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mem

import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -134,6 +135,13 @@ func (inv *InMemoryInventory) Stage(f func(inventory.WritablePage) error) (inven
func (inv *InMemoryInventory) computeHash(page *inMemoryPage) error {
h := inv.hashFactory.New()

buffer := make([]byte, 8)
binary.LittleEndian.PutUint64(buffer, page.index)
_, err := h.Write(buffer)
if err != nil {
return xerrors.Errorf("couldn't write index: %v", err)
}

keys := make(DigestSlice, 0, len(page.entries))
for key := range page.entries {
keys = append(keys, key)
Expand Down
18 changes: 10 additions & 8 deletions ledger/inventory/mem/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestInMemoryInventory_Stage(t *testing.T) {
inv.pages = append(inv.pages, inv.stagingPages[page.(*inMemoryPage).fingerprint])
inv.stagingPages = make(map[Digest]*inMemoryPage)
page, err = inv.Stage(func(page inventory.WritablePage) error {
page.Defer(func([]byte) {})
value, err := page.Read([]byte{1})
require.NoError(t, err)
require.True(t, value.(*wrappers.BoolValue).Value)
Expand All @@ -76,17 +77,18 @@ func TestInMemoryInventory_Stage(t *testing.T) {
require.EqualError(t, err, "couldn't fill new page: oops")

inv.hashFactory = fake.NewHashFactory(fake.NewBadHash())
_, err = inv.Stage(func(inventory.WritablePage) error {
return nil
})
require.Error(t, err)
require.Contains(t, err.Error(), "couldn't compute page hash: ")
_, err = inv.Stage(func(inventory.WritablePage) error { return nil })
require.EqualError(t, err,
"couldn't compute page hash: couldn't write index: fake error")

inv.hashFactory = fake.NewHashFactory(fake.NewBadHashWithDelay(1))
_, err = inv.Stage(func(inventory.WritablePage) error { return nil })
require.EqualError(t, err,
"couldn't compute page hash: couldn't write key: fake error")

inv.hashFactory = fake.NewHashFactory(&fake.Hash{})
inv.encoder = fake.BadMarshalStableEncoder{}
_, err = inv.Stage(func(inventory.WritablePage) error {
return nil
})
_, err = inv.Stage(func(inventory.WritablePage) error { return nil })
require.EqualError(t, err,
"couldn't compute page hash: couldn't marshal entry: fake error")
}
Expand Down
9 changes: 9 additions & 0 deletions mino/addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"github.com/stretchr/testify/require"
)

func TestAddressIterator_Seek(t *testing.T) {
addrs := []Address{fakeAddr{}, fakeAddr{}}

iter := addressIterator{addrs: addrs}
iter.Seek(1)
require.NotNil(t, iter.GetNext())
require.Nil(t, iter.GetNext())
}

func TestAddressIterator_HasNext(t *testing.T) {
addrs := []Address{fakeAddr{}, fakeAddr{}}

Expand Down

0 comments on commit 0376f0e

Please sign in to comment.