Skip to content

Commit

Permalink
Merge e1a37ea into 12b1c9f
Browse files Browse the repository at this point in the history
  • Loading branch information
nkcr committed May 5, 2020
2 parents 12b1c9f + e1a37ea commit 639a7cc
Show file tree
Hide file tree
Showing 14 changed files with 1,381 additions and 566 deletions.
14 changes: 11 additions & 3 deletions internal/testing/fake/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,24 @@ type AddressIterator struct {
index int
}

// NewAddressIterator returns a new address iterator
func NewAddressIterator(addrs []mino.Address) AddressIterator {
return AddressIterator{
addrs: addrs,
}
}

// HasNext implements mino.AddressIterator.
func (i *AddressIterator) HasNext() bool {
return i.index+1 < len(i.addrs)
return i.index < len(i.addrs)
}

// GetNext implements mino.AddressIterator.
func (i *AddressIterator) GetNext() mino.Address {
if i.HasNext() {
res := i.addrs[i.index]
i.index++
return i.addrs[i.index]
return res
}
return nil
}
Expand Down Expand Up @@ -597,7 +605,7 @@ type AddressFactory struct {

// FromText implements mino.AddressFactory.
func (f AddressFactory) FromText(text []byte) mino.Address {
if len(text) > 4 {
if len(text) >= 4 {
index := binary.LittleEndian.Uint32(text)
return Address{index: int(index)}
}
Expand Down
5 changes: 5 additions & 0 deletions mino/minogrpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TODO

- [ ] Merge OverlayMsg and Envelope
- [ ] Check the `<-ctx.Done()` in overlay.go
- [ ] Test traffic.go
26 changes: 15 additions & 11 deletions mino/minogrpc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (

"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/mino"
"go.dedis.ch/fabric/mino/minogrpc/routing"
"golang.org/x/xerrors"
)

//go:generate protoc -I ./ --go_out=plugins=grpc:./ ./overlay.proto

var namespaceMatch = regexp.MustCompile("^[a-zA-Z0-9]+$")
var (
namespaceMatch = regexp.MustCompile("^[a-zA-Z0-9]+$")
defaultAddressFactory = addressFactory{}
)

// Minogrpc represents a grpc service restricted to a namespace
type Minogrpc struct {
Expand Down Expand Up @@ -52,12 +56,11 @@ func (f addressFactory) FromText(text []byte) mino.Address {
return address{id: string(text)}
}

// NewMinogrpc sets up the grpc and http servers. It does not start the
// server. Identifier must be an address with a port, something like
// 127.0.0.1:3333
// NewMinogrpc sets up the grpc and http servers. Identifier must be an address
// with a port, something like 127.0.0.1:3333
//
// TODO: use a different type of argument for identifier, maybe net/url ?
func NewMinogrpc(identifier string) (Minogrpc, error) {
func NewMinogrpc(identifier string, rf routing.Factory) (Minogrpc, error) {

minoGrpc := Minogrpc{}

Expand All @@ -69,7 +72,7 @@ func NewMinogrpc(identifier string) (Minogrpc, error) {
id: identifier,
}

server, err := CreateServer(addr)
server, err := NewServer(addr, rf)
if err != nil {
return minoGrpc, xerrors.Errorf("failed to create server: %v", err)
}
Expand All @@ -91,7 +94,7 @@ func NewMinogrpc(identifier string) (Minogrpc, error) {
// GetAddressFactory implements Mino. It returns the address
// factory.
func (m Minogrpc) GetAddressFactory() mino.AddressFactory {
return addressFactory{}
return defaultAddressFactory
}

// GetAddress implements Mino. It returns the address of the server
Expand Down Expand Up @@ -129,10 +132,11 @@ func (m Minogrpc) MakeNamespace(namespace string) (mino.Mino, error) {
func (m Minogrpc) MakeRPC(name string, h mino.Handler) (mino.RPC, error) {
URI := fmt.Sprintf("%s/%s", m.namespace, name)
rpc := &RPC{
encoder: encoding.NewProtoEncoder(),
handler: h,
srv: m.server,
uri: URI,
encoder: encoding.NewProtoEncoder(),
handler: h,
srv: m.server,
uri: URI,
routingFactory: m.server.routingFactory,
}

m.server.handlers[URI] = h
Expand Down
16 changes: 8 additions & 8 deletions mino/minogrpc/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Test_NewMinogrpc(t *testing.T) {
// The happy path
id := "127.0.0.1:3333"

minoRPC, err := NewMinogrpc(id)
minoRPC, err := NewMinogrpc(id, nil)
require.NoError(t, err)

require.Equal(t, id, minoRPC.GetAddress().String())
Expand All @@ -41,7 +41,7 @@ func Test_NewMinogrpc(t *testing.T) {
require.Equal(t, peer, minoRPC.server.neighbours[id])

// Giving an empty address
minoRPC, err = NewMinogrpc("")
minoRPC, err = NewMinogrpc("", nil)
require.EqualError(t, err, "identifier can't be empty")
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestMinogrpc_GetAddressFactory(t *testing.T) {
}

func TestPlayers_AddressIterator(t *testing.T) {
players := fakePlayers{players: []address{{"test"}}}
players := fakePlayers{players: []mino.Address{address{"test"}}}
it := players.AddressIterator()
it2, ok := it.(*fakeAddressIterator)
require.True(t, ok)
Expand All @@ -175,9 +175,9 @@ func TestPlayers_AddressIterator(t *testing.T) {
}

func TestAddressIterator(t *testing.T) {
a := address{"test"}
a := &address{"test"}
it := fakeAddressIterator{
players: []address{a},
players: []mino.Address{a},
}

require.True(t, it.HasNext())
Expand All @@ -187,13 +187,13 @@ func TestAddressIterator(t *testing.T) {
require.False(t, it.HasNext())
}

// -----------------
// -----------------------------------------------------------------------------
// Utility functions

// fakePlayers implements mino.Players{}
type fakePlayers struct {
mino.Players
players []address
players []mino.Address
iterator *fakeAddressIterator
}

Expand All @@ -212,7 +212,7 @@ func (p *fakePlayers) Len() int {

// fakeAddressIterator implements mino.addressIterator{}
type fakeAddressIterator struct {
players []address
players []mino.Address
cursor int
}

Expand Down
92 changes: 57 additions & 35 deletions mino/minogrpc/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
context "context"
"crypto/tls"
"io"
"sync"

"go.dedis.ch/fabric"
"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/mino"
"go.dedis.ch/fabric/mino/minogrpc/routing"
"golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -25,13 +25,12 @@ type overlayService struct {
// addressID of the sender.
addr address
// This map is used to create a new stream connection if possible
mesh map[string]Peer
// routing table from the server
routingTable map[string]string
neighbour map[string]Peer
// This certificate is used to create a new stream connection if possible
srvCert *tls.Certificate
// Used to record traffic activity
traffic *traffic
traffic *traffic
routingFactory routing.Factory
}

// Call is the implementation of the overlay.Call proto definition
Expand Down Expand Up @@ -108,10 +107,25 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
"of handlers, did you register it?", apiURI[0])
}

rpcID := "server_" + o.addr.String()
rpcID := o.addr.String()

// For the moment this sender can only receive messages to itself
// TODO: find a way to know the other nodes.
// Listen on the first message, which should be the routing infos
overlayMsg, err := stream.Recv()
if err != nil {
return xerrors.Errorf("failed to receive first routing message: %v", err)
}

rting, err := o.routingFactory.FromAny(overlayMsg.Message)
if err != nil {
return xerrors.Errorf("failed to decode routing message: %v", err)
}

// fmt.Print(o.addr)
// rting.(*routing.TreeRouting).Display(os.Stdout)

// This sender acts as an orchestrator for the other nodes it creates a
// stream to. In a tree topology this means the orchestrator is the parent,
// and the created streams are the children.
sender := &sender{
encoder: o.encoder,
// This address is used when the client doesn't find the address it
Expand All @@ -122,24 +136,34 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
// doesn't relay but sends directly.
address: address{rpcID},
participants: map[string]overlayStream{rpcID: stream},
name: "remote RPC",
mesh: o.mesh,
name: "remote RPC of " + o.addr.String(),
srvCert: o.srvCert,
traffic: o.traffic,
routing: rting,
}

receiver := receiver{
encoder: o.encoder,
in: make(chan *OverlayMsg),
errs: make(chan error),
name: "remote RPC",
name: "remote RPC of " + o.addr.String(),
traffic: o.traffic,
}

var peerWait sync.WaitGroup
addrs, err := rting.GetDirectLinks(o.addr)
if err != nil {
return xerrors.Errorf("failed to get direct links: %v", err)
}

for _, addr := range addrs {
peer, found := o.neighbour[addr.String()]
if !found {
err = xerrors.Errorf("failed to find routing peer '%s' from the "+
"neighbours: %v", addr.String(), err)
fabric.Logger.Err(err).Send()
return err
}

for _, peer := range o.mesh {
addr := address{peer.Address}
clientConn, err := getConnection(addr.String(), peer, *o.srvCert)
if err != nil {
err = xerrors.Errorf("failed to get client conn for client '%s': %v",
Expand All @@ -150,23 +174,25 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
cl := NewOverlayClient(clientConn)

header := metadata.New(map[string]string{headerURIKey: apiURI[0]})
ctx = metadata.NewOutgoingContext(ctx, header)

clientStream, err := cl.Stream(ctx)
newCtx := stream.Context()
newCtx = metadata.NewOutgoingContext(newCtx, header)

clientStream, err := cl.Stream(newCtx)
if err != nil {
err = xerrors.Errorf("failed to get stream for client '%s': %v",
addr.String(), err)
fabric.Logger.Err(err).Send()
return err
}

sender.participants[addr.String()] = clientStream

// Sending the routing info as first messages to our children
clientStream.Send(&OverlayMsg{Message: overlayMsg.Message})

// Listen on the clients streams and notify the orchestrator or relay
// messages
peerWait.Add(1)
go func() {
defer peerWait.Done()
go func(addr mino.Address) {
for {
err := listenStream(clientStream, &receiver, sender, addr)
if err == io.EOF {
Expand All @@ -177,49 +203,45 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
return
}
if err != nil {
err = xerrors.Errorf("failed to listen stream: %v", err)
err = xerrors.Errorf("failed to listen stream on child in "+
"overlay: %v", err)
fabric.Logger.Err(err).Send()
return
}
}
}()
}

// add the gateways based on the routing table
for k, v := range o.routingTable {
gateway, ok := sender.participants[v]
if !ok {
// TODO: handle this situation
fabric.Logger.Warn().Msg("fix static check until it's done")
}
sender.participants[k] = gateway
}(addr)
}

// listen on my own stream
go func() {

for {
err := listenStream(stream, &receiver, sender, o.addr)
if err == io.EOF {
<-ctx.Done()
return
}
status, ok := status.FromError(err)
if ok && err != nil && status.Code() == codes.Canceled {
<-ctx.Done()
return
}
if err != nil {
err = xerrors.Errorf("failed to listen stream: %v", err)
err = xerrors.Errorf("failed to listen stream in overlay: %v", err)
fabric.Logger.Err(err).Send()
<-ctx.Done()
return
}
}
}()

err := handler.Stream(sender, receiver)
err = handler.Stream(sender, receiver)
if err != nil {
return xerrors.Errorf("failed to call the stream handler: %v", err)
}

peerWait.Wait()
<-ctx.Done()

return nil

}
Loading

0 comments on commit 639a7cc

Please sign in to comment.