Skip to content

Commit

Permalink
Minoch: extend coverage of unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed May 13, 2020
1 parent 380d814 commit d7e13bf
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 102 deletions.
34 changes: 19 additions & 15 deletions mino/minoch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,42 @@ func NewManager() *Manager {
}
}

func (m *Manager) get(addr mino.Address) *Minoch {
func (m *Manager) get(a mino.Address) (*Minoch, error) {
m.Lock()
defer m.Unlock()

text, err := addr.MarshalText()
if err != nil {
return nil
addr, ok := a.(address)
if !ok {
return nil, xerrors.Errorf("invalid address type '%T'", a)
}

return m.instances[string(text)]
peer, ok := m.instances[addr.id]
if !ok {
return nil, xerrors.Errorf("address <%s> not found", addr.id)
}

return peer, nil
}

func (m *Manager) insert(inst *Minoch) error {
addr := inst.GetAddress().(address)
text, err := addr.MarshalText()
if err != nil {
return xerrors.Errorf("couldn't marshal address: %v", err)
func (m *Manager) insert(inst mino.Mino) error {
instance, ok := inst.(*Minoch)
if !ok {
return xerrors.Errorf("invalid instance type '%T'", inst)
}

if string(text) == "" {
return xerrors.New("can't have an empty marshaled address")
if instance.identifier == "" {
return xerrors.New("cannot have an empty identifier")
}

m.Lock()
defer m.Unlock()

_, found := m.instances[string(text)]
_, found := m.instances[instance.identifier]
if found {
return xerrors.New("identifier already exists")
return xerrors.Errorf("identifier <%s> already exists", instance.identifier)
}

m.instances[string(text)] = inst
m.instances[instance.identifier] = instance

return nil
}
48 changes: 48 additions & 0 deletions mino/minoch/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package minoch

import (
"testing"

"github.com/stretchr/testify/require"
"go.dedis.ch/fabric/internal/testing/fake"
"go.dedis.ch/fabric/mino"
)

func TestManager_Get(t *testing.T) {
manager := &Manager{
instances: map[string]*Minoch{"A": {}},
}

m, err := manager.get(address{id: "A"})
require.NoError(t, err)
require.NotNil(t, m)

_, err = manager.get(address{id: "B"})
require.EqualError(t, err, "address <B> not found")

_, err = manager.get(fake.NewBadAddress())
require.EqualError(t, err, "invalid address type 'fake.Address'")
}

func TestManager_Insert(t *testing.T) {
manager := NewManager()

err := manager.insert(&Minoch{identifier: "A"})
require.NoError(t, err)

err = manager.insert(&Minoch{identifier: "A"})
require.EqualError(t, err, "identifier <A> already exists")

err = manager.insert(&Minoch{})
require.EqualError(t, err, "cannot have an empty identifier")

err = manager.insert(fakeInstance{})
require.EqualError(t, err, "invalid instance type 'minoch.fakeInstance'")
}

// -----------------------------------------------------------------------------
// Utility functions

type fakeInstance struct {
mino.Mino
}
4 changes: 4 additions & 0 deletions mino/minoch/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

m "go.dedis.ch/fabric"
"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/mino"
)

Expand All @@ -15,6 +16,7 @@ import (
type Minoch struct {
sync.Mutex
manager *Manager
encoder encoding.ProtoMarshaler
identifier string
path string
rpcs map[string]RPC
Expand All @@ -24,6 +26,7 @@ type Minoch struct {
func NewMinoch(manager *Manager, identifier string) (*Minoch, error) {
inst := &Minoch{
manager: manager,
encoder: encoding.NewProtoEncoder(),
identifier: identifier,
path: "",
rpcs: make(map[string]RPC),
Expand Down Expand Up @@ -66,6 +69,7 @@ func (m *Minoch) MakeNamespace(path string) (mino.Mino, error) {
func (m *Minoch) MakeRPC(name string, h mino.Handler) (mino.RPC, error) {
rpc := RPC{
manager: m.manager,
encoder: m.encoder,
addr: m.GetAddress(),
path: fmt.Sprintf("%s/%s", m.path, name),
h: h,
Expand Down
14 changes: 12 additions & 2 deletions mino/minoch/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package minoch
import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/stretchr/testify/require"
"go.dedis.ch/fabric/mino"
)
Expand Down Expand Up @@ -60,11 +62,19 @@ func TestMinoch_MakeRPC(t *testing.T) {
m, err := NewMinoch(manager, "A")
require.NoError(t, err)

rpc, err := m.MakeRPC("rpc1", testHandler{})
rpc, err := m.MakeRPC("rpc1", badHandler{})
require.NoError(t, err)
require.NotNil(t, rpc)
}

type testHandler struct {
type badHandler struct {
mino.UnsupportedHandler
}

type fakeHandler struct {
mino.UnsupportedHandler
}

func (h fakeHandler) Process(req mino.Request) (resp proto.Message, err error) {
return &empty.Empty{}, nil
}
57 changes: 35 additions & 22 deletions mino/minoch/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"sync"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/mino"
"golang.org/x/xerrors"
)
Expand All @@ -23,6 +23,7 @@ type Envelope struct {
// RPC is an implementation of the mino.RPC interface.
type RPC struct {
manager *Manager
encoder encoding.ProtoMarshaler
addr mino.Address
path string
h mino.Handler
Expand All @@ -40,7 +41,12 @@ func (c RPC) Call(ctx context.Context, req proto.Message,
wg.Add(players.Len())
iter := players.AddressIterator()
for iter.HasNext() {
peer := c.manager.get(iter.GetNext())
peer, err := c.manager.get(iter.GetNext())
if err != nil {
errs <- xerrors.Errorf("couldn't find peer: %v", err)
continue
}

cloneReq := proto.Clone(req)
go func(m *Minoch) {
defer wg.Done()
Expand All @@ -62,7 +68,7 @@ func (c RPC) Call(ctx context.Context, req proto.Message,

resp, err := rpc.h.Process(req)
if err != nil {
errs <- err
errs <- xerrors.Errorf("couldn't process request: %v", err)
}

if resp != nil {
Expand Down Expand Up @@ -92,26 +98,32 @@ func (c RPC) Stream(ctx context.Context, memship mino.Players) (mino.Sender, min
iter := memship.AddressIterator()
for iter.HasNext() {
addr := iter.GetNext()
ch := make(chan Envelope, 1)
outs[addr.String()] = receiver{out: ch}

peer := c.manager.get(addr)
peer, err := c.manager.get(addr)
if err != nil {
errs <- xerrors.Errorf("couldn't find peer: %v", err)
continue
}

ch := make(chan Envelope, 1)
outs[addr.String()] = receiver{encoder: c.encoder, out: ch}

go func(r receiver) {
s := sender{
addr: peer.GetAddress(),
in: in,
addr: peer.GetAddress(),
encoder: c.encoder,
in: in,
}

err := peer.rpcs[c.path].h.Stream(s, r)
if err != nil {
errs <- err
errs <- xerrors.Errorf("couldn't process: %v", err)
}
}(outs[addr.String()])
}

orchSender := sender{addr: address{}, in: in}
orchRecv := receiver{out: out, errs: errs}
orchSender := sender{addr: address{}, encoder: c.encoder, in: in}
orchRecv := receiver{encoder: c.encoder, out: out, errs: errs}

go func() {
for {
Expand Down Expand Up @@ -140,16 +152,17 @@ func (c RPC) Stream(ctx context.Context, memship mino.Players) (mino.Sender, min
}

type sender struct {
addr mino.Address
in chan Envelope
addr mino.Address
encoder encoding.ProtoMarshaler
in chan Envelope
}

func (s sender) Send(msg proto.Message, addrs ...mino.Address) <-chan error {
errs := make(chan error, int(math.Max(1, float64(len(addrs)))))

a, err := ptypes.MarshalAny(msg)
msgAny, err := s.encoder.MarshalAny(msg)
if err != nil {
errs <- err
errs <- xerrors.Errorf("couldn't marshal message: %v", err)
close(errs)
return errs
}
Expand All @@ -158,7 +171,7 @@ func (s sender) Send(msg proto.Message, addrs ...mino.Address) <-chan error {
s.in <- Envelope{
from: s.addr.(address),
to: addrs,
message: a,
message: msgAny,
}
close(errs)
}()
Expand All @@ -167,8 +180,9 @@ func (s sender) Send(msg proto.Message, addrs ...mino.Address) <-chan error {
}

type receiver struct {
out chan Envelope
errs chan error
encoder encoding.ProtoMarshaler
out chan Envelope
errs chan error
}

func (r receiver) Recv(ctx context.Context) (mino.Address, proto.Message, error) {
Expand All @@ -178,13 +192,12 @@ func (r receiver) Recv(ctx context.Context) (mino.Address, proto.Message, error)
return nil, nil, io.EOF
}

var da ptypes.DynamicAny
err := ptypes.UnmarshalAny(env.message, &da)
msg, err := r.encoder.UnmarshalDynamicAny(env.message)
if err != nil {
return nil, nil, err
return nil, nil, xerrors.Errorf("couldn't unmarshal message: %v", err)
}

return env.from, da.Message, nil
return env.from, msg, nil
case err := <-r.errs:
return nil, nil, err
case <-ctx.Done():
Expand Down
Loading

0 comments on commit d7e13bf

Please sign in to comment.