Skip to content

Commit

Permalink
Cleans the code before review
Browse files Browse the repository at this point in the history
  • Loading branch information
nkcr committed May 4, 2020
1 parent dbb09a3 commit ecbdfad
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 25 deletions.
2 changes: 1 addition & 1 deletion mino/minogrpc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (m Minogrpc) MakeRPC(name string, h mino.Handler) (mino.RPC, error) {
return rpc, nil
}

// AddNeighbours ...
// AddNeighbours fills the neighbours map of the server
func (m Minogrpc) AddNeighbours(minoGrpcs ...*Minogrpc) {
for _, minogrpc := range minoGrpcs {
m.server.addNeighbour(minogrpc.server)
Expand Down
8 changes: 4 additions & 4 deletions mino/minogrpc/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
addrs[i] = address{addrStr}
}

// TODO: use an interface and allow different types of routing
routing, err := o.routingFactory.FromAddrs(addrs, map[string]interface{}{
TreeRoutingOpts.Addr: o.addr, "treeHeight": treeHeight,
})
Expand Down Expand Up @@ -162,8 +161,8 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
for _, addr := range routing.GetDirectLinks() {
peer, found := o.neighbour[addr.String()]
if !found {
err = xerrors.Errorf("failed to find peer '%s' from the neighbours: %v",
addr.String(), err)
err = xerrors.Errorf("failed to find routing peer '%s' from the "+
"neighbours: %v", addr.String(), err)
fabric.Logger.Err(err).Send()
return err
}
Expand Down Expand Up @@ -207,7 +206,8 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
return
}
if err != nil {
err = xerrors.Errorf("failed to listen stream on child in overlay: %v", err)
err = xerrors.Errorf("failed to listen stream on child in "+
"overlay: %v", err)
fabric.Logger.Err(err).Send()
return
}
Expand Down
27 changes: 17 additions & 10 deletions mino/minogrpc/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ type RoutingFactory interface {

// Routing defines the functions needed to route messages
type Routing interface {
GetRoute(from mino.Address) (to mino.Address, err error)
// GetRoute should return the gateway address for a corresponding addresse.
// In a tree communication it is typically the address of the child that
// contains the "to" address in its sub-tree.
GetRoute(to mino.Address) (gateway mino.Address, err error)
// GetDirectLinks return the direct links of the elements. In a tree routing
// this is typically the childs of the node.
GetDirectLinks() []mino.Address
}

Expand All @@ -37,7 +42,9 @@ type TreeRouting struct {

// TreeRoutingOpts is the implementation of treeTreeRoutingOpts
var TreeRoutingOpts = treeRoutingOpts{
Addr: "addr",
// Addr is the address of the node, the value should be of type mino.Address
Addr: "addr",
// TreeHeight is the maximum tree height
TreeHeight: "treeHeight",
}

Expand All @@ -46,7 +53,7 @@ type treeRoutingOpts struct {
TreeHeight string
}

// Addr is the address of the node using the routing
// GetAddr parses the Addr option
func (t treeRoutingOpts) GetAddr(opts map[string]interface{}) (mino.Address, error) {
addrItf, found := opts[t.Addr]
if !found {
Expand All @@ -60,15 +67,15 @@ func (t treeRoutingOpts) GetAddr(opts map[string]interface{}) (mino.Address, err
return addr, nil
}

// TreeHeight is the height of the tree
// GetTreeHeight parses the tee height option
func (t treeRoutingOpts) GetTreeHeight(opts map[string]interface{}) (int, error) {
treeHeightItf, found := opts[t.TreeHeight]
if !found {
return -1, xerrors.Errorf("didn't find treeHeight option")
}
treeHeight, ok := treeHeightItf.(int)
if !ok {
return -1, xerrors.Errorf("provided treeHeight option is not an int: %v",
return -1, xerrors.Errorf("provided treeHeight option is not an int: %T",
treeHeightItf)
}
return treeHeight, nil
Expand Down Expand Up @@ -175,16 +182,16 @@ func (t treeRoutingFactory) FromAddrs(addrs []mino.Address,
// will send its message to node 3.
//
// - implements Routing
func (t TreeRouting) GetRoute(from mino.Address) (mino.Address, error) {
func (t TreeRouting) GetRoute(to mino.Address) (mino.Address, error) {

if t.me.Addr != nil && t.me.Addr.Equal(from) {
return from, nil
if t.me.Addr != nil && t.me.Addr.Equal(to) {
return to, nil
}

target, ok := t.routingNodes[from.String()]
target, ok := t.routingNodes[to.String()]
if !ok || target == nil {
return nil, xerrors.Errorf("failed to find node '%s' in routingNode map",
from.String())
to.String())
}

for _, c := range t.me.Childs {
Expand Down
17 changes: 7 additions & 10 deletions mino/minogrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
// in a tree based communication, this parameter (H) defines the height of
// the tree. Based on this parameter and the total number of nodes N we can
// compute the number of direct connection D for each node with D = N^(1/H)
treeHeight = 6
treeHeight = 3
)

// Server represents the entity that accepts incoming requests and invoke the
Expand Down Expand Up @@ -308,13 +308,9 @@ func listenStream(stream overlayStream, orchRecv *receiver,
}

for _, toSend := range envelope.To {
// if we receive a message to ourself or the orchestrator, then we
// notify the orchestrator receiver by filling orchRecv.in. If this is
// not the case we then relay the message. In the case we receive a
// message to ourself but we are in the orchestrator we then must not
// notify the orchestrator because in that case the message has not
// reached its final destination, in a tree networking it means we are
// in the root but we need to reach a node.
// if we receive a message to the orchestrator then we notify the
// orchestrator receiver by filling orchRecv.in. If this is not the case
// we relay the message.
if toSend == orchSender.address.String() {
orchRecv.in <- msg
} else {
Expand Down Expand Up @@ -540,8 +536,9 @@ func (s *sender) sendSingle(msg proto.Message, from, to mino.Address) error {

routingTo, err := s.routing.GetRoute(to)
if err != nil {
// If we can't get a route we send the message to ourself, which will
// reach our orchestrator.
// If we can't get a route we send the message to the orchestrator. In a
// tree based communication this means sending the message to our
// parent.
routingTo = s.address
}

Expand Down

0 comments on commit ecbdfad

Please sign in to comment.