From b314715a308b29468e1c440c6364ac3640c58ec2 Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Tue, 12 May 2020 08:44:09 +0200 Subject: [PATCH] Mino: implement an iterator helper --- blockchain/skipchain/handler.go | 4 +- blockchain/skipchain/mod.go | 1 + blockchain/skipchain/ops.go | 11 ++-- blockchain/skipchain/validator.go | 36 -------------- blockchain/skipchain/validator_test.go | 6 +++ internal/testing/fake/mod.go | 9 +++- mino/addr.go | 69 ++++++++++++++++++++++++++ mino/addr_test.go | 54 ++++++++++++++++++++ 8 files changed, 147 insertions(+), 43 deletions(-) create mode 100644 mino/addr.go create mode 100644 mino/addr_test.go diff --git a/blockchain/skipchain/handler.go b/blockchain/skipchain/handler.go index 45daa3e04..a93aff791 100644 --- a/blockchain/skipchain/handler.go +++ b/blockchain/skipchain/handler.go @@ -92,9 +92,7 @@ func (h handler) Stream(out mino.Sender, in mino.Receiver) error { resp.Chain = chainpb } - errs := out.Send(resp, addr) - - err = <-errs + err = <-out.Send(resp, addr) if err != nil { return xerrors.Errorf("couldn't send block: %v", err) } diff --git a/blockchain/skipchain/mod.go b/blockchain/skipchain/mod.go index e6d39ac8c..9c3a4d148 100644 --- a/blockchain/skipchain/mod.go +++ b/blockchain/skipchain/mod.go @@ -67,6 +67,7 @@ func NewSkipchain(m mino.Mino, consensus consensus.Consensus) *Skipchain { // consensus module. func (s *Skipchain) Listen(proc blockchain.PayloadProcessor) (blockchain.Actor, error) { ops := &operations{ + logger: fabric.Logger, encoder: s.encoder, addr: s.mino.GetAddress(), processor: proc, diff --git a/blockchain/skipchain/ops.go b/blockchain/skipchain/ops.go index c472a0460..3f98c7943 100644 --- a/blockchain/skipchain/ops.go +++ b/blockchain/skipchain/ops.go @@ -4,6 +4,7 @@ import ( "bytes" "context" + "github.com/rs/zerolog" "go.dedis.ch/fabric" "go.dedis.ch/fabric/blockchain" "go.dedis.ch/fabric/consensus" @@ -13,6 +14,7 @@ import ( ) type operations struct { + logger zerolog.Logger encoder encoding.ProtoMarshaler addr mino.Address processor blockchain.PayloadProcessor @@ -72,14 +74,17 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error { return nil } - fabric.Logger.Info().Msg("one or more blocks are missing: starting catch up") + ops.logger.Info().Msg("one or more blocks are missing: starting catch up") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sender, rcver := ops.rpc.Stream(ctx, newRoster(addr)) + sender, rcver := ops.rpc.Stream(ctx, mino.NewAddresses(addr)) - sender.Send(&BlockRequest{To: target.GetPreviousHash()}, addr) + err := <-sender.Send(&BlockRequest{To: target.GetPreviousHash()}, addr) + if err != nil { + return xerrors.Errorf("couldn't send block request: %v", err) + } for { _, msg, err := rcver.Recv(ctx) diff --git a/blockchain/skipchain/validator.go b/blockchain/skipchain/validator.go index 2be2750c4..06f7efa8c 100644 --- a/blockchain/skipchain/validator.go +++ b/blockchain/skipchain/validator.go @@ -115,39 +115,3 @@ func (q *blockQueue) Clear() { q.buffer = make(map[Digest]SkipBlock) } - -type addressIterator struct { - mino.AddressIterator - index int - addrs []mino.Address -} - -func (it *addressIterator) HasNext() bool { - return it.index < len(it.addrs) -} - -func (it *addressIterator) GetNext() mino.Address { - if it.HasNext() { - res := it.addrs[it.index] - it.index++ - return res - } - return nil -} - -type roster struct { - mino.Players - addrs []mino.Address -} - -func newRoster(addrs ...mino.Address) roster { - return roster{addrs: addrs} -} - -func (r roster) AddressIterator() mino.AddressIterator { - return &addressIterator{addrs: r.addrs} -} - -func (r roster) Len() int { - return len(r.addrs) -} diff --git a/blockchain/skipchain/validator_test.go b/blockchain/skipchain/validator_test.go index a934838b1..a623a652e 100644 --- a/blockchain/skipchain/validator_test.go +++ b/blockchain/skipchain/validator_test.go @@ -60,6 +60,12 @@ func TestBlockValidator_Validate(t *testing.T) { _, err = v.Validate(fake.Address{}, packed) require.EqualError(t, err, "couldn't validate the payload: oops") + v.db = &fakeDatabase{missing: true} + v.rpc = fake.NewStreamRPC(fake.Receiver{}, fake.NewBadSender()) + _, err = v.Validate(fake.Address{}, packed) + require.EqualError(t, err, + "couldn't catch up: couldn't send block request: fake error") + return true } diff --git a/internal/testing/fake/mod.go b/internal/testing/fake/mod.go index 580e5b4ea..b44684d64 100644 --- a/internal/testing/fake/mod.go +++ b/internal/testing/fake/mod.go @@ -624,11 +624,18 @@ func (r Receiver) Recv(context.Context) (mino.Address, proto.Message, error) { // Sender is a fake RPC stream sender. type Sender struct { mino.Sender + err error +} + +// NewBadSender returns a sender that always returns an error. +func NewBadSender() Sender { + return Sender{err: xerrors.New("fake error")} } // Send implements mino.Sender. func (s Sender) Send(proto.Message, ...mino.Address) <-chan error { - errs := make(chan error) + errs := make(chan error, 1) + errs <- s.err close(errs) return errs } diff --git a/mino/addr.go b/mino/addr.go new file mode 100644 index 000000000..b42eb4383 --- /dev/null +++ b/mino/addr.go @@ -0,0 +1,69 @@ +package mino + +// addressIterator is an implementation of the iterator for addresses. +// +// - implements mino.AddressIterator +type addressIterator struct { + index int + addrs []Address +} + +// HasNext implements mino.AddressIterator. It returns true if there is an +// address available. +func (it *addressIterator) HasNext() bool { + return it.index < len(it.addrs) +} + +// GetNext implements mino.AddressIterator. It returns the address at the +// current index and moves the iterator to the next address. +func (it *addressIterator) GetNext() Address { + if it.HasNext() { + res := it.addrs[it.index] + it.index++ + return res + } + return nil +} + +// roster is an implementation of the mino.Players interface. It provides helper +// when known addresses need to be grouped into a roster for Mino calls. +// +// - implements mino.Players +type roster struct { + addrs []Address +} + +// NewAddresses is a helper to instantiate a Players interface with only a few +// addresses. +func NewAddresses(addrs ...Address) Players { + return roster{addrs: addrs} +} + +// Take implements mino.Players. It returns a subset of the roster according to +// the filter. +func (r roster) Take(updaters ...FilterUpdater) Players { + filter := &Filter{} + for _, fn := range updaters { + fn(filter) + } + + addrs := make([]Address, len(filter.Indices)) + for i, k := range filter.Indices { + addrs[i] = r.addrs[k] + } + + newRoster := roster{addrs: addrs} + + return newRoster +} + +// AddressIterator implements mino.Players. It returns an iterator for the +// roster. +func (r roster) AddressIterator() AddressIterator { + return &addressIterator{addrs: r.addrs} +} + +// Len implements mino.Players. It returns the length of the roster. +func (r roster) Len() int { + return len(r.addrs) +} diff --git a/mino/addr_test.go b/mino/addr_test.go new file mode 100644 index 000000000..6ed65b8aa --- /dev/null +++ b/mino/addr_test.go @@ -0,0 +1,54 @@ +package mino + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAddressIterator_HasNext(t *testing.T) { + addrs := []Address{fakeAddr{}, fakeAddr{}} + + iter := addressIterator{addrs: addrs} + require.True(t, iter.HasNext()) + + iter.GetNext() + iter.GetNext() + require.False(t, iter.HasNext()) +} + +func TestAddressIterator_GetNext(t *testing.T) { + addrs := []Address{fakeAddr{}, fakeAddr{}} + + iter := addressIterator{addrs: addrs} + require.NotNil(t, iter.GetNext()) + require.NotNil(t, iter.GetNext()) + require.Nil(t, iter.GetNext()) +} + +func TestRoster_Take(t *testing.T) { + addrs := NewAddresses(fakeAddr{}, fakeAddr{}, fakeAddr{}) + + addrs2 := addrs.Take(IndexFilter(0), IndexFilter(2)) + require.Equal(t, 2, len(addrs2.(roster).addrs)) +} + +func TestRoster_AddressIterator(t *testing.T) { + addrs := NewAddresses(fakeAddr{}) + + iter := addrs.AddressIterator() + require.NotNil(t, iter.GetNext()) + require.Nil(t, iter.GetNext()) +} + +func TestRoster_Len(t *testing.T) { + addrs := NewAddresses(fakeAddr{}, fakeAddr{}) + require.Equal(t, 2, addrs.Len()) +} + +// ----------------------------------------------------------------------------- +// Utility functions + +type fakeAddr struct { + Address +}