Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed May 14, 2020
1 parent 4da8ec4 commit 6fef1d1
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 12 deletions.
3 changes: 3 additions & 0 deletions blockchain/skipchain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (db *InMemoryDatabase) Write(block SkipBlock) error {
// Contains implements skipchain.Database. It returns true if the block is
// stored in the database, otherwise false.
func (db *InMemoryDatabase) Contains(index uint64) bool {
db.Lock()
defer db.Unlock()

return index < uint64(len(db.blocks))
}

Expand Down
22 changes: 13 additions & 9 deletions blockchain/skipchain/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"golang.org/x/xerrors"
)

const catchUpLeeway = 200 * time.Millisecond
const catchUpLeeway = 100 * time.Millisecond

// operations implements helper functions that can be used by the handlers for
// common operations.
Expand Down Expand Up @@ -95,7 +95,7 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error {
from = last.GetIndex() + 1
}

if target.GetIndex()-from <= 2 {
if target.GetIndex()-from <= 1 {
// When only one block is missing, that probably means the propagation
// is not yet over, so it gives a chance to wait for it before starting
// the actual catch up.
Expand Down Expand Up @@ -191,12 +191,16 @@ func (ops *operations) waitBlock(index uint64) {
ops.watcher.Add(observer)
defer ops.watcher.Remove(observer)

select {
case <-observer.ch:
return
case <-time.After(catchUpLeeway):
// Even if the commit message could arrive later, the catch up procedure
// starts anyway.
return
for {
select {
case block := <-observer.ch:
if block.GetIndex() == index {
return
}
case <-time.After(catchUpLeeway):
// Even if the commit message could arrive later, the catch up procedure
// starts anyway.
return
}
}
}
6 changes: 3 additions & 3 deletions blockchain/skipchain/ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,19 @@ func TestOperations_CatchUp(t *testing.T) {
ops.db = &fakeDatabase{blocks: []SkipBlock{{}, {}, {}}}
ops.catchUpLock.Unlock()
}()
err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0))
err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0))
require.NoError(t, err)
require.Equal(t, 2, call.Len())

// Catch up with only one block missing but it arrives during the catch up.
ops.db = &fakeDatabase{blocks: []SkipBlock{{}}}
ops.watcher = &fakeWatcher{call: call, block: SkipBlock{}}
err = ops.catchUp(SkipBlock{Index: 3, BackLink: hash}, fake.NewAddress(0))
err = ops.catchUp(SkipBlock{Index: 2, BackLink: hash}, fake.NewAddress(0))
require.NoError(t, err)
require.Equal(t, 4, call.Len())

ops.db = &fakeDatabase{blocks: []SkipBlock{{}}, err: xerrors.New("oops")}
err = ops.catchUp(SkipBlock{Index: 3}, nil)
err = ops.catchUp(SkipBlock{Index: 2}, nil)
require.EqualError(t, err, "couldn't read last block: oops")

ops.db = &fakeDatabase{blocks: []SkipBlock{{}}}
Expand Down
2 changes: 2 additions & 0 deletions consensus/cosipbft/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func (c *Consensus) Store(in consensus.Chain) error {
if err != nil {
return xerrors.Errorf("couldn't store link: %v", err)
}

c.queue.Clear()
}
}

Expand Down
5 changes: 5 additions & 0 deletions consensus/cosipbft/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func TestConsensus_Store(t *testing.T) {
cons := &Consensus{
encoder: encoding.NewProtoEncoder(),
storage: newInMemoryStorage(),
queue: fakeQueue{},
}

links := []forwardLink{
Expand Down Expand Up @@ -585,6 +586,10 @@ type fakeQueue struct {
err error
}

func (q fakeQueue) Clear() {

}

func (q fakeQueue) Finalize(to Digest, commit crypto.Signature) (*ForwardLinkProto, error) {
return &ForwardLinkProto{}, q.err
}
Expand Down
8 changes: 8 additions & 0 deletions consensus/cosipbft/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Queue interface {
New(fl forwardLink, authority crypto.CollectiveAuthority) error
LockProposal(to Digest, sig crypto.Signature) error
Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, error)
Clear()
}

type item struct {
Expand Down Expand Up @@ -148,3 +149,10 @@ func (q *queue) Finalize(to Digest, sig crypto.Signature) (*ForwardLinkProto, er

return packed.(*ForwardLinkProto), nil
}

func (q *queue) Clear() {
q.Lock()
q.locked = false
q.items = nil
q.Unlock()
}
5 changes: 5 additions & 0 deletions ledger/inventory/mem/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mem

import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -134,6 +135,10 @@ func (inv *InMemoryInventory) Stage(f func(inventory.WritablePage) error) (inven
func (inv *InMemoryInventory) computeHash(page *inMemoryPage) error {
h := inv.hashFactory.New()

buffer := make([]byte, 8)
binary.LittleEndian.PutUint64(buffer, page.index)
h.Write(buffer)

keys := make(DigestSlice, 0, len(page.entries))
for key := range page.entries {
keys = append(keys, key)
Expand Down

0 comments on commit 6fef1d1

Please sign in to comment.