Skip to content

Commit

Permalink
ViewChange: add a rotating leader view change
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed May 13, 2020
1 parent 8de5ba3 commit f067472
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 42 deletions.
5 changes: 2 additions & 3 deletions blockchain/skipchain/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package skipchain

import (
"bytes"
"context"

proto "github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -59,8 +58,8 @@ func (h handler) Stream(out mino.Sender, in mino.Receiver) error {
}

var block SkipBlock
for i := int64(0); !bytes.Equal(block.hash[:], req.To); i++ {
block, err = h.db.Read(i)
for i := req.From; i <= req.GetTo(); i++ {
block, err = h.db.Read(int64(i))
if err != nil {
return xerrors.Errorf("couldn't read block at index %d: %v", i, err)
}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/skipchain/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestHandler_Stream(t *testing.T) {
},
}

rcvr := fakeReceiver{msg: &BlockRequest{To: Digest{0x01}.Bytes()}}
rcvr := fakeReceiver{msg: &BlockRequest{To: 1}}
call := &fake.Call{}
sender := fakeSender{call: call}

Expand Down
50 changes: 25 additions & 25 deletions blockchain/skipchain/messages.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions blockchain/skipchain/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ message PropagateGenesis {
}

message BlockRequest {
bytes from = 1;
bytes to = 2;
uint64 from = 1;
uint64 to = 2;
}

message BlockResponse {
Expand Down
32 changes: 30 additions & 2 deletions blockchain/skipchain/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ func (ops *operations) insertBlock(block SkipBlock) error {
}

func (ops *operations) commitBlock(block SkipBlock) error {
already := false

err := ops.db.Atomic(func(tx Queries) error {
err := tx.Write(block)
_, err := tx.Read(int64(block.GetIndex()))
if err == nil {
already = true
return nil
}

err = tx.Write(block)
if err != nil {
return xerrors.Errorf("couldn't write block: %v", err)
}
Expand All @@ -57,6 +65,11 @@ func (ops *operations) commitBlock(block SkipBlock) error {
return nil
})

if already {
// Skip the notification as it has already been done.
return nil
}

if err != nil {
return xerrors.Errorf("tx failed: %v", err)
}
Expand All @@ -78,12 +91,27 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error {

ops.logger.Info().Msg("one or more blocks are missing: starting catch up")

from := uint64(0)
if ops.db.Contains(0) {
last, err := ops.db.ReadLast()
if err != nil {
return err
}

from = last.GetIndex() + 1
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sender, rcver := ops.rpc.Stream(ctx, mino.NewAddresses(addr))

err := <-sender.Send(&BlockRequest{To: target.GetPreviousHash()}, addr)
req := &BlockRequest{
From: from,
To: target.GetIndex() - 1,
}

err := <-sender.Send(req, addr)
if err != nil {
return xerrors.Errorf("couldn't send block request: %v", err)
}
Expand Down
14 changes: 14 additions & 0 deletions blockchain/skipchain/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
type blockValidator struct {
*operations
queue *blockQueue

// When a catch is in progress, validations and commits should be delayed to
// let the system catches up to the latest known state.
catchUpLock sync.Mutex
}

func newBlockValidator(ops *operations) *blockValidator {
Expand All @@ -39,12 +43,19 @@ func (v *blockValidator) Validate(addr mino.Address,
return nil, xerrors.Errorf("couldn't decode block: %v", err)
}

v.catchUpLock.Lock()
once := sync.Once{}
defer once.Do(func() { v.catchUpLock.Unlock() })

// It makes sure that we know the whole chain up to the previous proposal.
err = v.catchUp(block, addr)
if err != nil {
return nil, xerrors.Errorf("couldn't catch up: %v", err)
}

// Override the defer to free the lock as soon as possible.
once.Do(func() { v.catchUpLock.Unlock() })

genesis, err := v.db.Read(0)
if err != nil {
return nil, xerrors.Errorf("couldn't read genesis block: %v", err)
Expand All @@ -68,6 +79,9 @@ func (v *blockValidator) Validate(addr mino.Address,
// Commit implements consensus.Validator. It commits the block that matches the
// identifier if it is present.
func (v *blockValidator) Commit(id []byte) error {
v.catchUpLock.Lock()
defer v.catchUpLock.Unlock()

digest := Digest{}
copy(digest[:], id)

Expand Down
3 changes: 3 additions & 0 deletions blockchain/skipchain/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func (db *fakeDatabase) Contains(index uint64) bool {
}

func (db *fakeDatabase) Read(index int64) (SkipBlock, error) {
if index >= int64(len(db.blocks)) {
return SkipBlock{}, NewNoBlockError(index)
}
return db.blocks[index], db.err
}

Expand Down
10 changes: 6 additions & 4 deletions consensus/cosipbft/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ type Consensus struct {
hashFactory crypto.HashFactory
chainFactory consensus.ChainFactory
governance viewchange.Governance
viewchange viewchange.ViewChange

// ViewChange can be personalized after instantiation.
ViewChange viewchange.ViewChange
}

// NewCoSiPBFT returns a new instance.
Expand All @@ -48,7 +50,7 @@ func NewCoSiPBFT(mino mino.Mino, cosi cosi.CollectiveSigning, gov viewchange.Gov
hashFactory: crypto.NewSha256Factory(),
chainFactory: newUnsecureChainFactory(cosi, mino),
governance: gov,
viewchange: constant.NewViewChange(mino.GetAddress()),
ViewChange: constant.NewViewChange(mino.GetAddress()),
}

return c
Expand Down Expand Up @@ -161,7 +163,7 @@ func (a pbftActor) Propose(p consensus.Proposal) error {
// If the leader has failed and this node has to take over, we use the
// inherant property of CoSiPBFT to prove that 2f participants want the view
// change.
leader, ok := a.viewchange.Wait(p, authority)
leader, ok := a.ViewChange.Wait(p, authority)
if !ok {
fabric.Logger.Trace().Msg("proposal skipped by view change")
// Not authorized to propose a block as the leader is moving forward so
Expand Down Expand Up @@ -307,7 +309,7 @@ func (h handler) Hash(addr mino.Address, in proto.Message) (Digest, error) {
changeset: changeset,
}

leader := h.viewchange.Verify(proposal, authority)
leader := h.ViewChange.Verify(proposal, authority)

// The identity of the leader must be insured to comply with the
// viewchange property. The Signature should be verified with the leader
Expand Down
10 changes: 5 additions & 5 deletions consensus/cosipbft/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,19 @@ func TestActor_Propose(t *testing.T) {
encoder: encoding.NewProtoEncoder(),
hashFactory: sha256Factory{},
governance: fakeGovernance{},
viewchange: fakeViewChange{},
ViewChange: fakeViewChange{},
cosi: &fakeCosi{},
},
closing: make(chan struct{}),
rpc: rpc,
cosiActor: cosiActor,
}

actor.viewchange = fakeViewChange{denied: true}
actor.ViewChange = fakeViewChange{denied: true}
err := actor.Propose(fakeProposal{})
require.NoError(t, err)

actor.viewchange = fakeViewChange{denied: false, leader: 2}
actor.ViewChange = fakeViewChange{denied: false, leader: 2}
err = actor.Propose(fakeProposal{hash: []byte{0xaa}})
require.NoError(t, err)
require.Len(t, cosiActor.calls, 2)
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestActor_Failures_Propose(t *testing.T) {
encoder: encoding.NewProtoEncoder(),
hashFactory: sha256Factory{},
governance: fakeGovernance{},
viewchange: fakeViewChange{},
ViewChange: fakeViewChange{},
cosi: &fakeCosi{},
},
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestHandler_Prepare_Hash(t *testing.T) {
hashFactory: crypto.NewSha256Factory(),
encoder: encoding.NewProtoEncoder(),
governance: fakeGovernance{authority: fake.NewAuthority(3, fake.NewSigner)},
viewchange: fakeViewChange{leader: 2},
ViewChange: fakeViewChange{leader: 2},
cosi: &fakeCosi{},
}
h := handler{
Expand Down
1 change: 1 addition & 0 deletions consensus/qsc/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func (e *badMarshalAnyEncoder) MarshalAny(proto.Message) (*any.Any, error) {
}

type fakeIterator struct {
mino.AddressIterator
index int
addrs []mino.Address
}
Expand Down
40 changes: 40 additions & 0 deletions consensus/viewchange/rotating/mod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package rotating

import (
"go.dedis.ch/fabric/consensus"
"go.dedis.ch/fabric/crypto"
"go.dedis.ch/fabric/mino"
)

// ViewChange is an implementation of the view change interface that will rotate
// the leader based on the index of the proposal.
//
// - implements viewchange.ViewChange
type ViewChange struct {
addr mino.Address
}

// NewViewChange returns a new instance of the view change.
func NewViewChange(addr mino.Address) ViewChange {
return ViewChange{addr: addr}
}

// Wait implements viewchange.ViewChange.
func (vc ViewChange) Wait(prop consensus.Proposal, authority crypto.CollectiveAuthority) (uint32, bool) {
leader := int(prop.GetIndex()) % authority.Len()

iter := authority.AddressIterator()
iter.Seek(leader)
if iter.HasNext() && iter.GetNext().Equal(vc.addr) {
return uint32(leader), true
}

return uint32(leader), false
}

// Verify implements viewchange.ViewChange.
func (vc ViewChange) Verify(prop consensus.Proposal, authority crypto.CollectiveAuthority) uint32 {
leader := int(prop.GetIndex()) % authority.Len()

return uint32(leader)
}
5 changes: 5 additions & 0 deletions internal/testing/fake/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func NewAddressIterator(addrs []mino.Address) AddressIterator {
}
}

// Seek implements mino.AddressIterator.
func (i *AddressIterator) Seek(index int) {
i.index = index
}

// HasNext implements mino.AddressIterator.
func (i *AddressIterator) HasNext() bool {
return i.index < len(i.addrs)
Expand Down
3 changes: 3 additions & 0 deletions ledger/byzcoin/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.dedis.ch/fabric/blockchain/skipchain"
"go.dedis.ch/fabric/consensus/cosipbft"
"go.dedis.ch/fabric/consensus/viewchange"
"go.dedis.ch/fabric/consensus/viewchange/rotating"
"go.dedis.ch/fabric/cosi/flatcosi"
"go.dedis.ch/fabric/crypto"
"go.dedis.ch/fabric/encoding"
Expand Down Expand Up @@ -64,6 +65,8 @@ func NewLedger(m mino.Mino, signer crypto.AggregateSigner) *Ledger {
}

consensus := cosipbft.NewCoSiPBFT(m, flatcosi.NewFlat(m, signer), gov)
// Set a rotating view change for the collective signing.
consensus.ViewChange = rotating.NewViewChange(m.GetAddress())

return &Ledger{
addr: m.GetAddress(),
Expand Down
5 changes: 5 additions & 0 deletions ledger/byzcoin/roster/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type addressIterator struct {
*iterator
}

// Seek implements mino.AddressIterator.
func (i *addressIterator) Seek(index int) {
i.index = index
}

// GetNext implements mino.AddressIterator. It returns the next address.
func (i *addressIterator) GetNext() mino.Address {
if i.iterator.HasNext() {
Expand Down
Loading

0 comments on commit f067472

Please sign in to comment.