Skip to content

Commit

Permalink
Implements a simple tree networking topology
Browse files Browse the repository at this point in the history
As discussed, each node gets the list of addresses and computes in a deterministic manner the tree topology.
  • Loading branch information
nkcr committed Apr 22, 2020
1 parent 4b756f2 commit 6261861
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 400 deletions.
3 changes: 2 additions & 1 deletion mino/minogrpc/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestMessages(t *testing.T) {
messages := []proto.Message{
&OverlayMsg{},
&Envelope{},
&RoutingMsg{},
}

for _, m := range messages {
Expand Down Expand Up @@ -187,7 +188,7 @@ func TestAddressIterator(t *testing.T) {
require.False(t, it.HasNext())
}

// -----------------
// -----------------------------------------------------------------------------
// Utility functions

// fakePlayers implements mino.Players{}
Expand Down
54 changes: 37 additions & 17 deletions mino/minogrpc/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ type overlayService struct {
// addressID of the sender.
addr address
// This map is used to create a new stream connection if possible
mesh map[string]Peer
// routing table from the server
routingTable map[string]string
neighbour map[string]Peer
// This certificate is used to create a new stream connection if possible
srvCert *tls.Certificate
// Used to record traffic activity
Expand Down Expand Up @@ -110,6 +108,27 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {

rpcID := "server_" + o.addr.String()

// Listen on the first message, which should be the routing infos
overlayMsg, err := stream.Recv()
if err != nil {
return xerrors.Errorf("failed to receive first routing message: %v", err)
}

routingMsg := &RoutingMsg{}
err = o.encoder.UnmarshalAny(overlayMsg.Message, routingMsg)
if err != nil {
return xerrors.Errorf("failed to decode first routing message: %v", err)
}

addrs := make([]mino.Address, len(routingMsg.Addrs))
for i, addrStr := range routingMsg.Addrs {
addrs[i] = address{addrStr}
}
routing, err := NewTreeRouting(addrs, o.addr, treeeHeight)
if err != nil {
return xerrors.Errorf("failed to create routing struct: %v", err)
}

// For the moment this sender can only receive messages to itself
// TODO: find a way to know the other nodes.
sender := &sender{
Expand All @@ -123,9 +142,9 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
address: address{rpcID},
participants: map[string]overlayStream{rpcID: stream},
name: "remote RPC",
mesh: o.mesh,
srvCert: o.srvCert,
traffic: o.traffic,
routing: routing,
}

receiver := receiver{
Expand All @@ -138,8 +157,16 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {

var peerWait sync.WaitGroup

for _, peer := range o.mesh {
addr := address{peer.Address}
for _, c := range routing.me.Childs {
addr := c.Addr
peer, found := o.neighbour[addr.String()]
if !found {
err = xerrors.Errorf("failed to find peer '%s' from the neighbours: %v",
addr.String(), err)
fabric.Logger.Err(err).Send()
return err
}

clientConn, err := getConnection(addr.String(), peer, *o.srvCert)
if err != nil {
err = xerrors.Errorf("failed to get client conn for client '%s': %v",
Expand All @@ -162,6 +189,9 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
}
sender.participants[addr.String()] = clientStream

// Sending the routing info as first messages to our childs
clientStream.Send(&OverlayMsg{Message: overlayMsg.Message})

// Listen on the clients streams and notify the orchestrator or relay
// messages
peerWait.Add(1)
Expand All @@ -185,16 +215,6 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
}()
}

// add the gateways based on the routing table
for k, v := range o.routingTable {
gateway, ok := sender.participants[v]
if !ok {
// TODO: handle this situation
fabric.Logger.Warn().Msg("fix static check until it's done")
}
sender.participants[k] = gateway
}

// listen on my own stream
go func() {
for {
Expand All @@ -214,7 +234,7 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
}
}()

err := handler.Stream(sender, receiver)
err = handler.Stream(sender, receiver)
if err != nil {
return xerrors.Errorf("failed to call the stream handler: %v", err)
}
Expand Down
92 changes: 70 additions & 22 deletions mino/minogrpc/overlay.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions mino/minogrpc/overlay.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ service Overlay {

message OverlayMsg {
google.protobuf.Any message = 1;
}

message RoutingMsg {
string type = 1;
repeated string addrs = 2;
}
30 changes: 24 additions & 6 deletions mino/minogrpc/overlay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (

proto "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
any "github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/empty"
"github.com/stretchr/testify/require"
"go.dedis.ch/fabric"
"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/internal/testing/fake"
"go.dedis.ch/fabric/mino"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -117,25 +119,29 @@ func TestOverlay_Stream(t *testing.T) {
overlayService.handlers["handler_key"] = testFailHandler{}
overlayService.traffic = &traffic{}

// Now I set the right elements in the header but use a handler that should
// Now I set the right elements in the header but do not send the routing
// message as it is expected to have one first
err = overlayService.Stream(&streamServer)
require.EqualError(t, err, "failed to decode first routing message: couldn't unwrap '*any.Any' to '*minogrpc.RoutingMsg': mismatched message type: got \"google.protobuf.Empty\" want \"minogrpc.RoutingMsg\"")

// Now I set the right elements in the header and set a dummy encoder to
// ignore the decoding of routing message but use a handler that should
// raise an error
header = metadata.New(map[string]string{})
header.Append(headerURIKey, "handler_key")
streamServer.ctx = metadata.NewIncomingContext(context.Background(), header)
overlayService.encoder = goodEncoder{}
err = overlayService.Stream(&streamServer)
require.EqualError(t, err, "failed to call the stream handler: oops")

// Now we set our mock StreamServer to return an error on receive
streamServer.recvError = true
// We have to reset it because it's already used and then would deadlock
err = overlayService.Stream(&streamServer)
require.EqualError(t, err, "failed to call the stream handler: oops")
require.EqualError(t, err, "failed to receive first routing message: oops from the server")
// We have to wait there so we catch the goroutine error
time.Sleep(time.Microsecond * 400)

}

// -----------------
// -----------------------------------------------------------------------------
// Utility functions

// this is to mock an overlay server stream
Expand Down Expand Up @@ -191,3 +197,15 @@ func (t testFailHandler2) Stream(out mino.Sender, in mino.Receiver) error {
require.EqualError(t.t, err, "")
return nil
}

type goodEncoder struct {
fake.BadMarshalStableEncoder
fake.BadPackAnyEncoder
fake.BadPackEncoder
fake.BadUnmarshalDynEncoder
fake.BadMarshalAnyEncoder
}

func (g goodEncoder) UnmarshalAny(any *any.Any, pb proto.Message) error {
return nil
}
Loading

0 comments on commit 6261861

Please sign in to comment.