diff --git a/mino/minogrpc/mod.go b/mino/minogrpc/mod.go index 119d6247a..dbad5c8cc 100644 --- a/mino/minogrpc/mod.go +++ b/mino/minogrpc/mod.go @@ -140,7 +140,7 @@ func (m Minogrpc) MakeRPC(name string, h mino.Handler) (mino.RPC, error) { return rpc, nil } -// AddNeighbours ... +// AddNeighbours fills the neighbours map of the server func (m Minogrpc) AddNeighbours(minoGrpcs ...*Minogrpc) { for _, minogrpc := range minoGrpcs { m.server.addNeighbour(minogrpc.server) diff --git a/mino/minogrpc/overlay.go b/mino/minogrpc/overlay.go index e5b6d7dcf..8c8b9ecd2 100644 --- a/mino/minogrpc/overlay.go +++ b/mino/minogrpc/overlay.go @@ -125,7 +125,6 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error { addrs[i] = address{addrStr} } - // TODO: use an interface and allow different types of routing routing, err := o.routingFactory.FromAddrs(addrs, map[string]interface{}{ TreeRoutingOpts.Addr: o.addr, "treeHeight": treeHeight, }) @@ -162,8 +161,8 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error { for _, addr := range routing.GetDirectLinks() { peer, found := o.neighbour[addr.String()] if !found { - err = xerrors.Errorf("failed to find peer '%s' from the neighbours: %v", - addr.String(), err) + err = xerrors.Errorf("failed to find routing peer '%s' from the "+ + "neighbours: %v", addr.String(), err) fabric.Logger.Err(err).Send() return err } @@ -207,7 +206,8 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error { return } if err != nil { - err = xerrors.Errorf("failed to listen stream on child in overlay: %v", err) + err = xerrors.Errorf("failed to listen stream on child in "+ + "overlay: %v", err) fabric.Logger.Err(err).Send() return } diff --git a/mino/minogrpc/routing.go b/mino/minogrpc/routing.go index 200aa8d8e..5cda9ce2e 100644 --- a/mino/minogrpc/routing.go +++ b/mino/minogrpc/routing.go @@ -22,7 +22,12 @@ type RoutingFactory interface { // Routing defines the functions needed to route messages type Routing interface { - GetRoute(from mino.Address) (to mino.Address, err error) + // GetRoute should return the gateway address for a corresponding addresse. + // In a tree communication it is typically the address of the child that + // contains the "to" address in its sub-tree. + GetRoute(to mino.Address) (gateway mino.Address, err error) + // GetDirectLinks return the direct links of the elements. In a tree routing + // this is typically the childs of the node. GetDirectLinks() []mino.Address } @@ -37,7 +42,9 @@ type TreeRouting struct { // TreeRoutingOpts is the implementation of treeTreeRoutingOpts var TreeRoutingOpts = treeRoutingOpts{ - Addr: "addr", + // Addr is the address of the node, the value should be of type mino.Address + Addr: "addr", + // TreeHeight is the maximum tree height TreeHeight: "treeHeight", } @@ -46,7 +53,7 @@ type treeRoutingOpts struct { TreeHeight string } -// Addr is the address of the node using the routing +// GetAddr parses the Addr option func (t treeRoutingOpts) GetAddr(opts map[string]interface{}) (mino.Address, error) { addrItf, found := opts[t.Addr] if !found { @@ -60,7 +67,7 @@ func (t treeRoutingOpts) GetAddr(opts map[string]interface{}) (mino.Address, err return addr, nil } -// TreeHeight is the height of the tree +// GetTreeHeight parses the tee height option func (t treeRoutingOpts) GetTreeHeight(opts map[string]interface{}) (int, error) { treeHeightItf, found := opts[t.TreeHeight] if !found { @@ -68,7 +75,7 @@ func (t treeRoutingOpts) GetTreeHeight(opts map[string]interface{}) (int, error) } treeHeight, ok := treeHeightItf.(int) if !ok { - return -1, xerrors.Errorf("provided treeHeight option is not an int: %v", + return -1, xerrors.Errorf("provided treeHeight option is not an int: %T", treeHeightItf) } return treeHeight, nil @@ -175,16 +182,16 @@ func (t treeRoutingFactory) FromAddrs(addrs []mino.Address, // will send its message to node 3. // // - implements Routing -func (t TreeRouting) GetRoute(from mino.Address) (mino.Address, error) { +func (t TreeRouting) GetRoute(to mino.Address) (mino.Address, error) { - if t.me.Addr != nil && t.me.Addr.Equal(from) { - return from, nil + if t.me.Addr != nil && t.me.Addr.Equal(to) { + return to, nil } - target, ok := t.routingNodes[from.String()] + target, ok := t.routingNodes[to.String()] if !ok || target == nil { return nil, xerrors.Errorf("failed to find node '%s' in routingNode map", - from.String()) + to.String()) } for _, c := range t.me.Childs { diff --git a/mino/minogrpc/server.go b/mino/minogrpc/server.go index ba01d38b2..83c264373 100644 --- a/mino/minogrpc/server.go +++ b/mino/minogrpc/server.go @@ -43,7 +43,7 @@ var ( // in a tree based communication, this parameter (H) defines the height of // the tree. Based on this parameter and the total number of nodes N we can // compute the number of direct connection D for each node with D = N^(1/H) - treeHeight = 6 + treeHeight = 3 ) // Server represents the entity that accepts incoming requests and invoke the @@ -308,13 +308,9 @@ func listenStream(stream overlayStream, orchRecv *receiver, } for _, toSend := range envelope.To { - // if we receive a message to ourself or the orchestrator, then we - // notify the orchestrator receiver by filling orchRecv.in. If this is - // not the case we then relay the message. In the case we receive a - // message to ourself but we are in the orchestrator we then must not - // notify the orchestrator because in that case the message has not - // reached its final destination, in a tree networking it means we are - // in the root but we need to reach a node. + // if we receive a message to the orchestrator then we notify the + // orchestrator receiver by filling orchRecv.in. If this is not the case + // we relay the message. if toSend == orchSender.address.String() { orchRecv.in <- msg } else { @@ -540,8 +536,9 @@ func (s *sender) sendSingle(msg proto.Message, from, to mino.Address) error { routingTo, err := s.routing.GetRoute(to) if err != nil { - // If we can't get a route we send the message to ourself, which will - // reach our orchestrator. + // If we can't get a route we send the message to the orchestrator. In a + // tree based communication this means sending the message to our + // parent. routingTo = s.address }