Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalization of the roster change #43

Merged
merged 9 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion blockchain/skipchain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b SkipBlock) Pack(encoder encoding.ProtoMarshaler) (proto.Message, error)
// String implements fmt.Stringer. It returns a string representation of the
// block.
func (b SkipBlock) String() string {
return fmt.Sprintf("Block[%v]", b.hash)
return fmt.Sprintf("Block[%d:%v]", b.Index, b.hash)
}

func (b SkipBlock) computeHash(factory crypto.HashFactory,
Expand Down
9 changes: 7 additions & 2 deletions blockchain/skipchain/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func TestSkipBlock_HashUniqueness(t *testing.T) {
}

func TestSkipBlock_String(t *testing.T) {
block := SkipBlock{hash: Digest{1}}
require.Equal(t, block.String(), "Block[0100000000000000]")
block := SkipBlock{Index: 5, hash: Digest{1}}
require.Equal(t, block.String(), "Block[5:0100000000000000]")
}

func TestVerifiableBlock_Pack(t *testing.T) {
Expand Down Expand Up @@ -345,6 +345,7 @@ type fakeConsensus struct {
err error
errChain error
errFactory error
errStore error
}

func (c fakeConsensus) GetChainFactory() (consensus.ChainFactory, error) {
Expand All @@ -362,3 +363,7 @@ func (c fakeConsensus) GetChain(id []byte) (consensus.Chain, error) {
func (c fakeConsensus) Listen(consensus.Validator) (consensus.Actor, error) {
return nil, c.err
}

func (c fakeConsensus) Store(consensus.Chain) error {
return c.errStore
}
7 changes: 7 additions & 0 deletions blockchain/skipchain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// blocks.
type Queries interface {
Write(block SkipBlock) error
Contains(index uint64) bool
Read(index int64) (SkipBlock, error)
ReadLast() (SkipBlock, error)
}
Expand Down Expand Up @@ -78,6 +79,12 @@ func (db *InMemoryDatabase) Write(block SkipBlock) error {
return nil
}

// Contains implements skipchain.Database. It returns true if the block is
// stored in the database, otherwise false.
func (db *InMemoryDatabase) Contains(index uint64) bool {
return index < uint64(len(db.blocks))
}

// Read implements skipchain.Database. It returns the block at the given index
// if it exists, otherwise an error.
func (db *InMemoryDatabase) Read(index int64) (SkipBlock, error) {
Expand Down
86 changes: 60 additions & 26 deletions blockchain/skipchain/handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package skipchain

import (
"bytes"
"context"

proto "github.com/golang/protobuf/proto"
"go.dedis.ch/fabric"
"go.dedis.ch/fabric/blockchain"
"go.dedis.ch/fabric/mino"
"golang.org/x/xerrors"
)
Expand All @@ -14,15 +15,12 @@ import (
// - implements mino.Handler
type handler struct {
mino.UnsupportedHandler
*Skipchain

proc blockchain.PayloadProcessor
*operations
}

func newHandler(sc *Skipchain, proc blockchain.PayloadProcessor) handler {
func newHandler(ops *operations) handler {
return handler{
Skipchain: sc,
proc: proc,
operations: ops,
}
}

Expand All @@ -33,36 +31,72 @@ func (h handler) Process(req mino.Request) (proto.Message, error) {
case *PropagateGenesis:
genesis, err := h.blockFactory.decodeBlock(in.GetGenesis())
if err != nil {
return nil, xerrors.Errorf("couldn't decode the block: %v", err)
return nil, xerrors.Errorf("couldn't decode block: %v", err)
}

err = h.insertBlock(genesis)
if err != nil {
return nil, xerrors.Errorf("couldn't store genesis: %v", err)
}

err = h.proc.Validate(0, genesis.GetPayload())
return nil, nil
default:
return nil, xerrors.Errorf("unknown message type '%T'", in)
}
}

// Stream implements mino.Handler. It handles block requests to help another
// participant to catch up the latest chain.
func (h handler) Stream(out mino.Sender, in mino.Receiver) error {
addr, msg, err := in.Recv(context.Background())
if err != nil {
return xerrors.Errorf("couldn't receive message: %v", err)
}

req, ok := msg.(*BlockRequest)
if !ok {
return xerrors.Errorf("invalid message type '%T' != '%T'", msg, req)
}

var block SkipBlock
for i := int64(0); !bytes.Equal(block.hash[:], req.To); i++ {
block, err = h.db.Read(i)
if err != nil {
return nil, xerrors.Errorf("couldn't validate genesis payload: %v", err)
return xerrors.Errorf("couldn't read block at index %d: %v", i, err)
}

err = h.db.Atomic(func(ops Queries) error {
err = ops.Write(genesis)
blockpb, err := h.encoder.Pack(block)
if err != nil {
return xerrors.Errorf("couldn't pack block: %v", err)
}

resp := &BlockResponse{
Block: blockpb.(*BlockProto),
}

if block.GetIndex() > 0 {
// In the case the genesis block needs to be sent, there is no chain
// to send alongside.

chain, err := h.consensus.GetChain(block.GetHash())
if err != nil {
return xerrors.Errorf("couldn't write the block: %v", err)
return xerrors.Errorf("couldn't get chain to block %d: %v",
block.GetIndex(), err)
}

err = h.proc.Commit(genesis.GetPayload())
chainpb, err := h.encoder.PackAny(chain)
if err != nil {
return xerrors.Errorf("couldn't commit genesis payload: %v", err)
return xerrors.Errorf("couldn't pack chain: %v", err)
}

return nil
})
if err != nil {
return nil, xerrors.Errorf("tx aborted: %v", err)
resp.Chain = chainpb
}

fabric.Logger.Trace().Msgf("new genesis block written: %v", genesis.hash)
h.watcher.Notify(genesis)

return nil, nil
default:
return nil, xerrors.Errorf("unknown message type '%T'", in)
err = <-out.Send(resp, addr)
if err != nil {
return xerrors.Errorf("couldn't send block: %v", err)
}
}

return nil
}
179 changes: 124 additions & 55 deletions blockchain/skipchain/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,144 @@
package skipchain

import (
"context"
"testing"
"testing/quick"

proto "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/stretchr/testify/require"
"go.dedis.ch/fabric/crypto"
"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/internal/testing/fake"
"go.dedis.ch/fabric/mino"
"golang.org/x/xerrors"
)

func TestHandler_Process(t *testing.T) {
f := func(block SkipBlock) bool {
proc := &fakePayloadProc{}
watcher := &fakeWatcher{}
h := newHandler(&Skipchain{
blockFactory: blockFactory{
encoder: encoding.NewProtoEncoder(),
hashFactory: crypto.NewSha256Factory(),
},
db: &fakeDatabase{},
watcher: watcher,
}, proc)

block.Payload = &wrappers.BoolValue{Value: true}

packed, err := block.Pack(encoding.NewProtoEncoder())
require.NoError(t, err)

req := mino.Request{
Message: &PropagateGenesis{Genesis: packed.(*BlockProto)},
}
resp, err := h.Process(req)
require.NoError(t, err)
require.Nil(t, resp)
require.Len(t, proc.calls, 2)
require.Equal(t, uint64(0), proc.calls[0][0])
require.True(t, proto.Equal(block.Payload, proc.calls[0][1].(proto.Message)))
require.True(t, proto.Equal(block.Payload, proc.calls[1][0].(proto.Message)))
require.Equal(t, 1, watcher.notified)

req.Message = &empty.Empty{}
_, err = h.Process(req)
require.EqualError(t, err, "unknown message type '*empty.Empty'")

req.Message = &PropagateGenesis{}
_, err = h.Process(req)
require.Error(t, err)
require.Contains(t, err.Error(), "couldn't decode the block: ")

req.Message = &PropagateGenesis{Genesis: packed.(*BlockProto)}
h.proc = &fakePayloadProc{errValidate: xerrors.New("oops")}
_, err = h.Process(req)
require.EqualError(t, err, "couldn't validate genesis payload: oops")

h.proc = &fakePayloadProc{errCommit: xerrors.New("oops")}
_, err = h.Process(req)
require.EqualError(t, err, "tx aborted: couldn't commit genesis payload: oops")

h.proc = &fakePayloadProc{}
h.Skipchain.db = &fakeDatabase{err: xerrors.New("oops")}
_, err = h.Process(req)
require.EqualError(t, err, "tx aborted: couldn't write the block: oops")

return true
proc := &fakePayloadProc{}
watcher := &fakeWatcher{}
h := newHandler(&operations{
processor: proc,
blockFactory: blockFactory{
encoder: encoding.NewProtoEncoder(),
hashFactory: crypto.NewSha256Factory(),
},
db: &fakeDatabase{},
watcher: watcher,
})

genesis := SkipBlock{
Index: 0,
Payload: &wrappers.BoolValue{Value: true},
}

err := quick.Check(f, nil)
packed, err := genesis.Pack(encoding.NewProtoEncoder())
require.NoError(t, err)

req := mino.Request{
Message: &PropagateGenesis{Genesis: packed.(*BlockProto)},
}
resp, err := h.Process(req)
require.NoError(t, err)
require.Nil(t, resp)
require.Len(t, proc.calls, 2)
require.Equal(t, uint64(0), proc.calls[0][0])
require.True(t, proto.Equal(genesis.Payload, proc.calls[0][1].(proto.Message)))
require.True(t, proto.Equal(genesis.Payload, proc.calls[1][0].(proto.Message)))
require.Equal(t, 1, watcher.notified)

req.Message = &empty.Empty{}
_, err = h.Process(req)
require.EqualError(t, err, "unknown message type '*empty.Empty'")

req.Message = &PropagateGenesis{}
_, err = h.Process(req)
require.EqualError(t, err,
"couldn't decode block: couldn't unmarshal payload: message is nil")

proc.errValidate = xerrors.New("oops")
req.Message = &PropagateGenesis{Genesis: packed.(*BlockProto)}
_, err = h.Process(req)
require.EqualError(t, err,
"couldn't store genesis: couldn't validate block: oops")
}

func TestHandler_Stream(t *testing.T) {
db := &fakeDatabase{blocks: []SkipBlock{
{Payload: &empty.Empty{}},
{hash: Digest{0x01}, Index: 1, Payload: &empty.Empty{}}},
}
h := handler{
operations: &operations{
encoder: encoding.NewProtoEncoder(),
db: db,
consensus: fakeConsensus{},
},
}

rcvr := fakeReceiver{msg: &BlockRequest{To: Digest{0x01}.Bytes()}}
call := &fake.Call{}
sender := fakeSender{call: call}

err := h.Stream(sender, rcvr)
require.NoError(t, err)
require.Equal(t, 2, call.Len())

err = h.Stream(sender, fakeReceiver{err: xerrors.New("oops")})
require.EqualError(t, err, "couldn't receive message: oops")

err = h.Stream(sender, fakeReceiver{msg: nil})
require.EqualError(t, err,
"invalid message type '<nil>' != '*skipchain.BlockRequest'")

db.err = xerrors.New("oops")
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't read block at index 0: oops")

db.err = nil
h.encoder = fake.BadPackEncoder{}
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't pack block: fake error")

h.encoder = encoding.NewProtoEncoder()
h.consensus = fakeConsensus{err: xerrors.New("oops")}
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't get chain to block 1: oops")

h.consensus = fakeConsensus{}
h.encoder = fake.BadPackAnyEncoder{}
err = h.Stream(sender, rcvr)
require.EqualError(t, err, "couldn't pack chain: fake error")

h.encoder = encoding.NewProtoEncoder()
err = h.Stream(fakeSender{err: xerrors.New("oops")}, rcvr)
require.EqualError(t, err, "couldn't send block: oops")
}

// -----------------------------------------------------------------------------
// Utility functions

type fakeReceiver struct {
mino.Receiver
msg proto.Message
err error
}

func (rcvr fakeReceiver) Recv(context.Context) (mino.Address, proto.Message, error) {
return nil, rcvr.msg, rcvr.err
}

type fakeSender struct {
mino.Sender
call *fake.Call
err error
}

func (s fakeSender) Send(msg proto.Message, addrs ...mino.Address) <-chan error {
s.call.Add(msg, addrs)
errs := make(chan error, 1)
errs <- s.err
close(errs)
return errs
}
Loading