Skip to content

Commit

Permalink
Merge pull request #43 from dedis/roster_change_full
Browse files Browse the repository at this point in the history
Finalization of the roster change
  • Loading branch information
nkcr committed May 13, 2020
2 parents d1864da + c58010d commit 380d814
Show file tree
Hide file tree
Showing 36 changed files with 1,688 additions and 526 deletions.
2 changes: 1 addition & 1 deletion blockchain/skipchain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b SkipBlock) Pack(encoder encoding.ProtoMarshaler) (proto.Message, error)
// String implements fmt.Stringer. It returns a string representation of the
// block.
func (b SkipBlock) String() string {
return fmt.Sprintf("Block[%v]", b.hash)
return fmt.Sprintf("Block[%d:%v]", b.Index, b.hash)
}

func (b SkipBlock) computeHash(factory crypto.HashFactory,
Expand Down
9 changes: 7 additions & 2 deletions blockchain/skipchain/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func TestSkipBlock_HashUniqueness(t *testing.T) {
}

func TestSkipBlock_String(t *testing.T) {
block := SkipBlock{hash: Digest{1}}
require.Equal(t, block.String(), "Block[0100000000000000]")
block := SkipBlock{Index: 5, hash: Digest{1}}
require.Equal(t, block.String(), "Block[5:0100000000000000]")
}

func TestVerifiableBlock_Pack(t *testing.T) {
Expand Down Expand Up @@ -345,6 +345,7 @@ type fakeConsensus struct {
err error
errChain error
errFactory error
errStore error
}

func (c fakeConsensus) GetChainFactory() (consensus.ChainFactory, error) {
Expand All @@ -362,3 +363,7 @@ func (c fakeConsensus) GetChain(id []byte) (consensus.Chain, error) {
func (c fakeConsensus) Listen(consensus.Validator) (consensus.Actor, error) {
return nil, c.err
}

func (c fakeConsensus) Store(consensus.Chain) error {
return c.errStore
}
7 changes: 7 additions & 0 deletions blockchain/skipchain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// blocks.
type Queries interface {
Write(block SkipBlock) error
Contains(index uint64) bool
Read(index int64) (SkipBlock, error)
ReadLast() (SkipBlock, error)
}
Expand Down Expand Up @@ -78,6 +79,12 @@ func (db *InMemoryDatabase) Write(block SkipBlock) error {
return nil
}

// Contains implements skipchain.Database. It returns true if the block is
// stored in the database, otherwise false.
func (db *InMemoryDatabase) Contains(index uint64) bool {
return index < uint64(len(db.blocks))
}

// Read implements skipchain.Database. It returns the block at the given index
// if it exists, otherwise an error.
func (db *InMemoryDatabase) Read(index int64) (SkipBlock, error) {
Expand Down
86 changes: 60 additions & 26 deletions blockchain/skipchain/handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package skipchain

import (
"bytes"
"context"

proto "github.com/golang/protobuf/proto"
"go.dedis.ch/fabric"
"go.dedis.ch/fabric/blockchain"
"go.dedis.ch/fabric/mino"
"golang.org/x/xerrors"
)
Expand All @@ -14,15 +15,12 @@ import (
// - implements mino.Handler
type handler struct {
mino.UnsupportedHandler
*Skipchain

proc blockchain.PayloadProcessor
*operations
}

func newHandler(sc *Skipchain, proc blockchain.PayloadProcessor) handler {
func newHandler(ops *operations) handler {
return handler{
Skipchain: sc,
proc: proc,
operations: ops,
}
}

Expand All @@ -33,36 +31,72 @@ func (h handler) Process(req mino.Request) (proto.Message, error) {
case *PropagateGenesis:
genesis, err := h.blockFactory.decodeBlock(in.GetGenesis())
if err != nil {
return nil, xerrors.Errorf("couldn't decode the block: %v", err)
return nil, xerrors.Errorf("couldn't decode block: %v", err)
}

err = h.insertBlock(genesis)
if err != nil {
return nil, xerrors.Errorf("couldn't store genesis: %v", err)
}

err = h.proc.Validate(0, genesis.GetPayload())
return nil, nil
default:
return nil, xerrors.Errorf("unknown message type '%T'", in)
}
}

// Stream implements mino.Handler. It handles block requests to help another
// participant to catch up the latest chain.
func (h handler) Stream(out mino.Sender, in mino.Receiver) error {
addr, msg, err := in.Recv(context.Background())
if err != nil {
return xerrors.Errorf("couldn't receive message: %v", err)
}

req, ok := msg.(*BlockRequest)
if !ok {
return xerrors.Errorf("invalid message type '%T' != '%T'", msg, req)
}

var block SkipBlock
for i := int64(0); !bytes.Equal(block.hash[:], req.To); i++ {
block, err = h.db.Read(i)
if err != nil {
return nil, xerrors.Errorf("couldn't validate genesis payload: %v", err)
return xerrors.Errorf("couldn't read block at index %d: %v", i, err)
}

err = h.db.Atomic(func(ops Queries) error {
err = ops.Write(genesis)
blockpb, err := h.encoder.Pack(block)
if err != nil {
return xerrors.Errorf("couldn't pack block: %v", err)
}

resp := &BlockResponse{
Block: blockpb.(*BlockProto),
}

if block.GetIndex() > 0 {
// In the case the genesis block needs to be sent, there is no chain
// to send alongside.

chain, err := h.consensus.GetChain(block.GetHash())
if err != nil {
return xerrors.Errorf("couldn't write the block: %v", err)
return xerrors.Errorf("couldn't get chain to block %d: %v",
block.GetIndex(), err)
}

err = h.proc.Commit(genesis.GetPayload())
chainpb, err := h.encoder.PackAny(chain)
if err != nil {
return xerrors.Errorf("couldn't commit genesis payload: %v", err)
return xerrors.Errorf("couldn't pack chain: %v", err)
}

return nil
})
if err != nil {
return nil, xerrors.Errorf("tx aborted: %v", err)
resp.Chain = chainpb
}

fabric.Logger.Trace().Msgf("new genesis block written: %v", genesis.hash)
h.watcher.Notify(genesis)

return nil, nil
default:
return nil, xerrors.Errorf("unknown message type '%T'", in)
err = <-out.Send(resp, addr)
if err != nil {
return xerrors.Errorf("couldn't send block: %v", err)
}
}

return nil
}
179 changes: 124 additions & 55 deletions blockchain/skipchain/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,144 @@
package skipchain

import (
"context"
"testing"
"testing/quick"

proto "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/stretchr/testify/require"
"go.dedis.ch/fabric/crypto"
"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/internal/testing/fake"
"go.dedis.ch/fabric/mino"
"golang.org/x/xerrors"
)

func TestHandler_Process(t *testing.T) {
f := func(block SkipBlock) bool {
proc := &fakePayloadProc{}
watcher := &fakeWatcher{}
h := newHandler(&Skipchain{
blockFactory: blockFactory{
encoder: encoding.NewProtoEncoder(),
hashFactory: crypto.NewSha256Factory(),
},
db: &fakeDatabase{},
watcher: watcher,
}, proc)

block.Payload = &wrappers.BoolValue{Value: true}

packed, err := block.Pack(encoding.NewProtoEncoder())
require.NoError(t, err)

req := mino.Request{
Message: &PropagateGenesis{Genesis: packed.(*BlockProto)},
}
resp, err := h.Process(req)
require.NoError(t, err)
require.Nil(t, resp)
require.Len(t, proc.calls, 2)
require.Equal(t, uint64(0), proc.calls[0][0])
require.True(t, proto.Equal(block.Payload, proc.calls[0][1].(proto.Message)))
require.True(t, proto.Equal(block.Payload, proc.calls[1][0].(proto.Message)))
require.Equal(t, 1, watcher.notified)

req.Message = &empty.Empty{}
_, err = h.Process(req)
require.EqualError(t, err, "unknown message type '*empty.Empty'")

req.Message = &PropagateGenesis{}
_, err = h.Process(req)
require.Error(t, err)
require.Contains(t, err.Error(), "couldn't decode the block: ")

req.Message = &PropagateGenesis{Genesis: packed.(*BlockProto)}
h.proc = &fakePayloadProc{errValidate: xerrors.New("oops")}
_, err = h.Process(req)
require.EqualError(t, err, "couldn't validate genesis payload: oops")

h.proc = &fakePayloadProc{errCommit: xerrors.New("oops")}
_, err = h.Process(req)
require.EqualError(t, err, "tx aborted: couldn't commit genesis payload: oops")

h.proc = &fakePayloadProc{}
h.Skipchain.db = &fakeDatabase{err: xerrors.New("oops")}
_, err = h.Process(req)
require.EqualError(t, err, "tx aborted: couldn't write the block: oops")

return true
proc := &fakePayloadProc{}
watcher := &fakeWatcher{}
h := newHandler(&operations{
processor: proc,
blockFactory: blockFactory{
encoder: encoding.NewProtoEncoder(),
hashFactory: crypto.NewSha256Factory(),
},
db: &fakeDatabase{},
watcher: watcher,
})

genesis := SkipBlock{
Index: 0,
Payload: &wrappers.BoolValue{Value: true},
}

err := quick.Check(f, nil)
packed, err := genesis.Pack(encoding.NewProtoEncoder())
require.NoError(t, err)

req := mino.Request{
Message: &PropagateGenesis{Genesis: packed.(*BlockProto)},
}
resp, err := h.Process(req)
require.NoError(t, err)
require.Nil(t, resp)
require.Len(t, proc.calls, 2)
require.Equal(t, uint64(0), proc.calls[0][0])
require.True(t, proto.Equal(genesis.Payload, proc.calls[0][1].(proto.Message)))
require.True(t, proto.Equal(genesis.Payload, proc.calls[1][0].(proto.Message)))
require.Equal(t, 1, watcher.notified)

req.Message = &empty.Empty{}
_, err = h.Process(req)
require.EqualError(t, err, "unknown message type '*empty.Empty'")

req.Message = &PropagateGenesis{}
_, err = h.Process(req)
require.EqualError(t, err,
"couldn't decode block: couldn't unmarshal payload: message is nil")

proc.errValidate = xerrors.New("oops")
req.Message = &PropagateGenesis{Genesis: packed.(*BlockProto)}
_, err = h.Process(req)
require.EqualError(t, err,
"couldn't store genesis: couldn't validate block: oops")
}

func TestHandler_Stream(t *testing.T) {
db := &fakeDatabase{blocks: []SkipBlock{
{Payload: &empty.Empty{}},
{hash: Digest{0x01}, Index: 1, Payload: &empty.Empty{}}},
}
h := handler{
operations: &operations{
encoder: encoding.NewProtoEncoder(),
db: db,
consensus: fakeConsensus{},
},
}

rcvr := fakeReceiver{msg: &BlockRequest{To: Digest{0x01}.Bytes()}}
call := &fake.Call{}
sender := fakeSender{call: call}

err := h.Stream(sender, rcvr)
require.NoError(t, err)
require.Equal(t, 2, call.Len())

err = h.Stream(sender, fakeReceiver{err: xerrors.New("oops")})
require.EqualError(t, err, "couldn't receive message: oops")

err = h.Stream(sender, fakeReceiver{msg: nil})
require.EqualError(t, err,
"invalid message type '<nil>' != '*skipchain.BlockRequest'")

db.err = xerrors.New("oops")
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't read block at index 0: oops")

db.err = nil
h.encoder = fake.BadPackEncoder{}
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't pack block: fake error")

h.encoder = encoding.NewProtoEncoder()
h.consensus = fakeConsensus{err: xerrors.New("oops")}
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't get chain to block 1: oops")

h.consensus = fakeConsensus{}
h.encoder = fake.BadPackAnyEncoder{}
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't pack chain: fake error")

h.encoder = encoding.NewProtoEncoder()
err = h.Stream(fakeSender{err: xerrors.New("oops")}, rcvr)
require.EqualError(t, err, "couldn't send block: oops")
}

// -----------------------------------------------------------------------------
// Utility functions

type fakeReceiver struct {
mino.Receiver
msg proto.Message
err error
}

func (rcvr fakeReceiver) Recv(context.Context) (mino.Address, proto.Message, error) {
return nil, rcvr.msg, rcvr.err
}

type fakeSender struct {
mino.Sender
call *fake.Call
err error
}

func (s fakeSender) Send(msg proto.Message, addrs ...mino.Address) <-chan error {
s.call.Add(msg, addrs)
errs := make(chan error, 1)
errs <- s.err
close(errs)
return errs
}
Loading

0 comments on commit 380d814

Please sign in to comment.