Skip to content

Commit

Permalink
Merge dbb09a3 into 6509bf5
Browse files Browse the repository at this point in the history
  • Loading branch information
nkcr authored May 4, 2020
2 parents 6509bf5 + dbb09a3 commit e182877
Show file tree
Hide file tree
Showing 11 changed files with 1,225 additions and 573 deletions.
25 changes: 16 additions & 9 deletions mino/minogrpc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ func (f addressFactory) FromText(text []byte) mino.Address {
return address{id: string(text)}
}

// NewMinogrpc sets up the grpc and http servers. It does not start the
// server. Identifier must be an address with a port, something like
// 127.0.0.1:3333
// NewMinogrpc sets up the grpc and http servers. Identifier must be an address
// with a port, something like 127.0.0.1:3333
//
// TODO: use a different type of argument for identifier, maybe net/url ?
func NewMinogrpc(identifier string) (Minogrpc, error) {
func NewMinogrpc(identifier string, rf RoutingFactory) (Minogrpc, error) {

minoGrpc := Minogrpc{}

Expand All @@ -69,7 +68,7 @@ func NewMinogrpc(identifier string) (Minogrpc, error) {
id: identifier,
}

server, err := CreateServer(addr)
server, err := NewServer(addr, rf)
if err != nil {
return minoGrpc, xerrors.Errorf("failed to create server: %v", err)
}
Expand Down Expand Up @@ -129,13 +128,21 @@ func (m Minogrpc) MakeNamespace(namespace string) (mino.Mino, error) {
func (m Minogrpc) MakeRPC(name string, h mino.Handler) (mino.RPC, error) {
URI := fmt.Sprintf("%s/%s", m.namespace, name)
rpc := &RPC{
encoder: encoding.NewProtoEncoder(),
handler: h,
srv: m.server,
uri: URI,
encoder: encoding.NewProtoEncoder(),
handler: h,
srv: m.server,
uri: URI,
routingFactory: m.server.routingFactory,
}

m.server.handlers[URI] = h

return rpc, nil
}

// AddNeighbours ...
func (m Minogrpc) AddNeighbours(minoGrpcs ...*Minogrpc) {
for _, minogrpc := range minoGrpcs {
m.server.addNeighbour(minogrpc.server)
}
}
17 changes: 9 additions & 8 deletions mino/minogrpc/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestMessages(t *testing.T) {
messages := []proto.Message{
&OverlayMsg{},
&Envelope{},
&RoutingMsg{},
}

for _, m := range messages {
Expand All @@ -27,7 +28,7 @@ func Test_NewMinogrpc(t *testing.T) {
// The happy path
id := "127.0.0.1:3333"

minoRPC, err := NewMinogrpc(id)
minoRPC, err := NewMinogrpc(id, TreeRoutingFactory)
require.NoError(t, err)

require.Equal(t, id, minoRPC.GetAddress().String())
Expand All @@ -41,7 +42,7 @@ func Test_NewMinogrpc(t *testing.T) {
require.Equal(t, peer, minoRPC.server.neighbours[id])

// Giving an empty address
minoRPC, err = NewMinogrpc("")
minoRPC, err = NewMinogrpc("", TreeRoutingFactory)
require.EqualError(t, err, "identifier can't be empty")
}

Expand Down Expand Up @@ -164,7 +165,7 @@ func TestMinogrpc_GetAddressFactory(t *testing.T) {
}

func TestPlayers_AddressIterator(t *testing.T) {
players := fakePlayers{players: []address{{"test"}}}
players := fakePlayers{players: []mino.Address{address{"test"}}}
it := players.AddressIterator()
it2, ok := it.(*fakeAddressIterator)
require.True(t, ok)
Expand All @@ -175,9 +176,9 @@ func TestPlayers_AddressIterator(t *testing.T) {
}

func TestAddressIterator(t *testing.T) {
a := address{"test"}
a := &address{"test"}
it := fakeAddressIterator{
players: []address{a},
players: []mino.Address{a},
}

require.True(t, it.HasNext())
Expand All @@ -187,13 +188,13 @@ func TestAddressIterator(t *testing.T) {
require.False(t, it.HasNext())
}

// -----------------
// -----------------------------------------------------------------------------
// Utility functions

// fakePlayers implements mino.Players{}
type fakePlayers struct {
mino.Players
players []address
players []mino.Address
iterator *fakeAddressIterator
}

Expand All @@ -212,7 +213,7 @@ func (p *fakePlayers) Len() int {

// fakeAddressIterator implements mino.addressIterator{}
type fakeAddressIterator struct {
players []address
players []mino.Address
cursor int
}

Expand Down
91 changes: 58 additions & 33 deletions mino/minogrpc/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
context "context"
"crypto/tls"
"io"
"sync"

"go.dedis.ch/fabric"
"go.dedis.ch/fabric/encoding"
Expand All @@ -25,13 +24,12 @@ type overlayService struct {
// addressID of the sender.
addr address
// This map is used to create a new stream connection if possible
mesh map[string]Peer
// routing table from the server
routingTable map[string]string
neighbour map[string]Peer
// This certificate is used to create a new stream connection if possible
srvCert *tls.Certificate
// Used to record traffic activity
traffic *traffic
traffic *traffic
routingFactory RoutingFactory
}

// Call is the implementation of the overlay.Call proto definition
Expand Down Expand Up @@ -108,7 +106,32 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
"of handlers, did you register it?", apiURI[0])
}

rpcID := "server_" + o.addr.String()
rpcID := o.addr.String()

// Listen on the first message, which should be the routing infos
overlayMsg, err := stream.Recv()
if err != nil {
return xerrors.Errorf("failed to receive first routing message: %v", err)
}

routingMsg := &RoutingMsg{}
err = o.encoder.UnmarshalAny(overlayMsg.Message, routingMsg)
if err != nil {
return xerrors.Errorf("failed to decode first routing message: %v", err)
}

addrs := make([]mino.Address, len(routingMsg.Addrs))
for i, addrStr := range routingMsg.Addrs {
addrs[i] = address{addrStr}
}

// TODO: use an interface and allow different types of routing
routing, err := o.routingFactory.FromAddrs(addrs, map[string]interface{}{
TreeRoutingOpts.Addr: o.addr, "treeHeight": treeHeight,
})
if err != nil {
return xerrors.Errorf("failed to create routing struct: %v", err)
}

// For the moment this sender can only receive messages to itself
// TODO: find a way to know the other nodes.
Expand All @@ -122,24 +145,29 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
// doesn't relay but sends directly.
address: address{rpcID},
participants: map[string]overlayStream{rpcID: stream},
name: "remote RPC",
mesh: o.mesh,
name: "remote RPC of " + o.addr.String(),
srvCert: o.srvCert,
traffic: o.traffic,
routing: routing,
}

receiver := receiver{
encoder: o.encoder,
in: make(chan *OverlayMsg),
errs: make(chan error),
name: "remote RPC",
name: "remote RPC of " + o.addr.String(),
traffic: o.traffic,
}

var peerWait sync.WaitGroup
for _, addr := range routing.GetDirectLinks() {
peer, found := o.neighbour[addr.String()]
if !found {
err = xerrors.Errorf("failed to find peer '%s' from the neighbours: %v",
addr.String(), err)
fabric.Logger.Err(err).Send()
return err
}

for _, peer := range o.mesh {
addr := address{peer.Address}
clientConn, err := getConnection(addr.String(), peer, *o.srvCert)
if err != nil {
err = xerrors.Errorf("failed to get client conn for client '%s': %v",
Expand All @@ -150,23 +178,25 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
cl := NewOverlayClient(clientConn)

header := metadata.New(map[string]string{headerURIKey: apiURI[0]})
ctx = metadata.NewOutgoingContext(ctx, header)

clientStream, err := cl.Stream(ctx)
newCtx := stream.Context()
newCtx = metadata.NewOutgoingContext(newCtx, header)

clientStream, err := cl.Stream(newCtx)
if err != nil {
err = xerrors.Errorf("failed to get stream for client '%s': %v",
addr.String(), err)
fabric.Logger.Err(err).Send()
return err
}

sender.participants[addr.String()] = clientStream

// Sending the routing info as first messages to our childs
clientStream.Send(&OverlayMsg{Message: overlayMsg.Message})

// Listen on the clients streams and notify the orchestrator or relay
// messages
peerWait.Add(1)
go func() {
defer peerWait.Done()
go func(addr mino.Address) {
for {
err := listenStream(clientStream, &receiver, sender, addr)
if err == io.EOF {
Expand All @@ -177,49 +207,44 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
return
}
if err != nil {
err = xerrors.Errorf("failed to listen stream: %v", err)
err = xerrors.Errorf("failed to listen stream on child in overlay: %v", err)
fabric.Logger.Err(err).Send()
return
}
}
}()
}

// add the gateways based on the routing table
for k, v := range o.routingTable {
gateway, ok := sender.participants[v]
if !ok {
// TODO: handle this situation
fabric.Logger.Warn().Msg("fix static check until it's done")
}
sender.participants[k] = gateway
}(addr)
}

// listen on my own stream
go func() {

for {
err := listenStream(stream, &receiver, sender, o.addr)
if err == io.EOF {
<-ctx.Done()
return
}
status, ok := status.FromError(err)
if ok && err != nil && status.Code() == codes.Canceled {
<-ctx.Done()
return
}
if err != nil {
err = xerrors.Errorf("failed to listen stream: %v", err)
err = xerrors.Errorf("failed to listen stream in overlay: %v", err)
fabric.Logger.Err(err).Send()
<-ctx.Done()
return
}
}
}()

err := handler.Stream(sender, receiver)
err = handler.Stream(sender, receiver)
if err != nil {
return xerrors.Errorf("failed to call the stream handler: %v", err)
}

peerWait.Wait()
<-ctx.Done()

return nil

}
Loading

0 comments on commit e182877

Please sign in to comment.