Skip to content

Commit

Permalink
Mino: implement an iterator helper
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed May 12, 2020
1 parent f4ce90b commit b314715
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 43 deletions.
4 changes: 1 addition & 3 deletions blockchain/skipchain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ func (h handler) Stream(out mino.Sender, in mino.Receiver) error {
resp.Chain = chainpb
}

errs := out.Send(resp, addr)

err = <-errs
err = <-out.Send(resp, addr)
if err != nil {
return xerrors.Errorf("couldn't send block: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions blockchain/skipchain/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func NewSkipchain(m mino.Mino, consensus consensus.Consensus) *Skipchain {
// consensus module.
func (s *Skipchain) Listen(proc blockchain.PayloadProcessor) (blockchain.Actor, error) {
ops := &operations{
logger: fabric.Logger,
encoder: s.encoder,
addr: s.mino.GetAddress(),
processor: proc,
Expand Down
11 changes: 8 additions & 3 deletions blockchain/skipchain/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"

"github.com/rs/zerolog"
"go.dedis.ch/fabric"
"go.dedis.ch/fabric/blockchain"
"go.dedis.ch/fabric/consensus"
Expand All @@ -13,6 +14,7 @@ import (
)

type operations struct {
logger zerolog.Logger
encoder encoding.ProtoMarshaler
addr mino.Address
processor blockchain.PayloadProcessor
Expand Down Expand Up @@ -72,14 +74,17 @@ func (ops *operations) catchUp(target SkipBlock, addr mino.Address) error {
return nil
}

fabric.Logger.Info().Msg("one or more blocks are missing: starting catch up")
ops.logger.Info().Msg("one or more blocks are missing: starting catch up")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sender, rcver := ops.rpc.Stream(ctx, newRoster(addr))
sender, rcver := ops.rpc.Stream(ctx, mino.NewAddresses(addr))

sender.Send(&BlockRequest{To: target.GetPreviousHash()}, addr)
err := <-sender.Send(&BlockRequest{To: target.GetPreviousHash()}, addr)
if err != nil {
return xerrors.Errorf("couldn't send block request: %v", err)
}

for {
_, msg, err := rcver.Recv(ctx)
Expand Down
36 changes: 0 additions & 36 deletions blockchain/skipchain/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,39 +115,3 @@ func (q *blockQueue) Clear() {

q.buffer = make(map[Digest]SkipBlock)
}

type addressIterator struct {
mino.AddressIterator
index int
addrs []mino.Address
}

func (it *addressIterator) HasNext() bool {
return it.index < len(it.addrs)
}

func (it *addressIterator) GetNext() mino.Address {
if it.HasNext() {
res := it.addrs[it.index]
it.index++
return res
}
return nil
}

type roster struct {
mino.Players
addrs []mino.Address
}

func newRoster(addrs ...mino.Address) roster {
return roster{addrs: addrs}
}

func (r roster) AddressIterator() mino.AddressIterator {
return &addressIterator{addrs: r.addrs}
}

func (r roster) Len() int {
return len(r.addrs)
}
6 changes: 6 additions & 0 deletions blockchain/skipchain/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func TestBlockValidator_Validate(t *testing.T) {
_, err = v.Validate(fake.Address{}, packed)
require.EqualError(t, err, "couldn't validate the payload: oops")

v.db = &fakeDatabase{missing: true}
v.rpc = fake.NewStreamRPC(fake.Receiver{}, fake.NewBadSender())
_, err = v.Validate(fake.Address{}, packed)
require.EqualError(t, err,
"couldn't catch up: couldn't send block request: fake error")

return true
}

Expand Down
9 changes: 8 additions & 1 deletion internal/testing/fake/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,18 @@ func (r Receiver) Recv(context.Context) (mino.Address, proto.Message, error) {
// Sender is a fake RPC stream sender.
type Sender struct {
mino.Sender
err error
}

// NewBadSender returns a sender that always returns an error.
func NewBadSender() Sender {
return Sender{err: xerrors.New("fake error")}
}

// Send implements mino.Sender.
func (s Sender) Send(proto.Message, ...mino.Address) <-chan error {
errs := make(chan error)
errs := make(chan error, 1)
errs <- s.err
close(errs)
return errs
}
Expand Down
69 changes: 69 additions & 0 deletions mino/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package mino

// addressIterator is an implementation of the iterator for addresses.
//
// - implements mino.AddressIterator
type addressIterator struct {
index int
addrs []Address
}

// HasNext implements mino.AddressIterator. It returns true if there is an
// address available.
func (it *addressIterator) HasNext() bool {
return it.index < len(it.addrs)
}

// GetNext implements mino.AddressIterator. It returns the address at the
// current index and moves the iterator to the next address.
func (it *addressIterator) GetNext() Address {
if it.HasNext() {
res := it.addrs[it.index]
it.index++
return res
}
return nil
}

// roster is an implementation of the mino.Players interface. It provides helper
// when known addresses need to be grouped into a roster for Mino calls.
//
// - implements mino.Players
type roster struct {
addrs []Address
}

// NewAddresses is a helper to instantiate a Players interface with only a few
// addresses.
func NewAddresses(addrs ...Address) Players {
return roster{addrs: addrs}
}

// Take implements mino.Players. It returns a subset of the roster according to
// the filter.
func (r roster) Take(updaters ...FilterUpdater) Players {
filter := &Filter{}
for _, fn := range updaters {
fn(filter)
}

addrs := make([]Address, len(filter.Indices))
for i, k := range filter.Indices {
addrs[i] = r.addrs[k]
}

newRoster := roster{addrs: addrs}

return newRoster
}

// AddressIterator implements mino.Players. It returns an iterator for the
// roster.
func (r roster) AddressIterator() AddressIterator {
return &addressIterator{addrs: r.addrs}
}

// Len implements mino.Players. It returns the length of the roster.
func (r roster) Len() int {
return len(r.addrs)
}
54 changes: 54 additions & 0 deletions mino/addr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package mino

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestAddressIterator_HasNext(t *testing.T) {
addrs := []Address{fakeAddr{}, fakeAddr{}}

iter := addressIterator{addrs: addrs}
require.True(t, iter.HasNext())

iter.GetNext()
iter.GetNext()
require.False(t, iter.HasNext())
}

func TestAddressIterator_GetNext(t *testing.T) {
addrs := []Address{fakeAddr{}, fakeAddr{}}

iter := addressIterator{addrs: addrs}
require.NotNil(t, iter.GetNext())
require.NotNil(t, iter.GetNext())
require.Nil(t, iter.GetNext())
}

func TestRoster_Take(t *testing.T) {
addrs := NewAddresses(fakeAddr{}, fakeAddr{}, fakeAddr{})

addrs2 := addrs.Take(IndexFilter(0), IndexFilter(2))
require.Equal(t, 2, len(addrs2.(roster).addrs))
}

func TestRoster_AddressIterator(t *testing.T) {
addrs := NewAddresses(fakeAddr{})

iter := addrs.AddressIterator()
require.NotNil(t, iter.GetNext())
require.Nil(t, iter.GetNext())
}

func TestRoster_Len(t *testing.T) {
addrs := NewAddresses(fakeAddr{}, fakeAddr{})
require.Equal(t, 2, addrs.Len())
}

// -----------------------------------------------------------------------------
// Utility functions

type fakeAddr struct {
Address
}

0 comments on commit b314715

Please sign in to comment.