Skip to content

Commit

Permalink
Creates connection when an address is in the "neighbours" list
Browse files Browse the repository at this point in the history
  • Loading branch information
nkcr committed Mar 26, 2020
1 parent 19a5a90 commit 06f4a1b
Show file tree
Hide file tree
Showing 6 changed files with 438 additions and 96 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ require (
github.com/rs/zerolog v1.18.0
github.com/stretchr/testify v1.5.1
go.dedis.ch/kyber/v3 v3.0.12
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
google.golang.org/grpc v1.27.1
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
)
6 changes: 3 additions & 3 deletions mino/minogrpc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var namespaceMatch = regexp.MustCompile("^[a-zA-Z0-9]+$")

// Minogrpc represents a grpc service restricted to a namespace
type Minogrpc struct {
server Server
server *Server
namespace string
}

Expand Down Expand Up @@ -81,7 +81,7 @@ func NewMinogrpc(identifier string) (Minogrpc, error) {
}
server.neighbours[identifier] = peer

minoGrpc.server = *server
minoGrpc.server = server
minoGrpc.namespace = ""

return minoGrpc, err
Expand Down Expand Up @@ -135,5 +135,5 @@ func (m Minogrpc) MakeRPC(name string, h mino.Handler) (mino.RPC, error) {

m.server.handlers[URI] = h

return rpc, nil
return &rpc, nil
}
4 changes: 2 additions & 2 deletions mino/minogrpc/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Test_Address(t *testing.T) {
id: "test",
}
minoGrpc := Minogrpc{
server: Server{
server: &Server{
addr: addr,
},
}
Expand All @@ -91,7 +91,7 @@ func Test_Address(t *testing.T) {
func Test_MakeRPC(t *testing.T) {
minoGrpc := Minogrpc{}
minoGrpc.namespace = "namespace"
minoGrpc.server = Server{
minoGrpc.server = &Server{
handlers: make(map[string]mino.Handler),
}

Expand Down
52 changes: 37 additions & 15 deletions mino/minogrpc/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
// gRPC service for the overlay. The handler map points to the one in
// Server.Handlers, which is updated each time the makeRPC function is called.
type overlayService struct {
handlers map[string]mino.Handler
addr address
handlers map[string]mino.Handler
addr address
srv *Server
handlerRcvr map[string]receiver
}

// Call is the implementation of the overlay.Call proto definition
Expand Down Expand Up @@ -89,35 +91,55 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
"header. Expected 1, found %d", len(apiURI))
}

rcvr, ok := o.handlerRcvr[apiURI[0]]
if ok {
go func() {
for {
msg, err := stream.Recv()
status, ok := status.FromError(err)
if ok && err != nil && status.Code() == codes.Canceled {
return
}
if err != nil {
fabric.Logger.Error().Msgf("failed to receive in overlay: %v", err)
rcvr.errs <- xerrors.Errorf("failed to receive in overlay: %v", err)
}
rcvr.in <- msg
}
}()
<-o.handlerRcvr[apiURI[0]].stop
return nil
}

handler, ok := o.handlers[apiURI[0]]
if !ok {
return xerrors.Errorf("didn't find the '%s' handler in the map "+
"of handlers, did you register it?", apiURI[0])
}

// This participant is used to send back messages that must be
// relayed.
// o.participants["server_"+o.addr.String()] = stream

// For the moment this sender can only receive messages to itself
// TODO: find a way to know the other nodes.
sender := sender{
sender := &sender{
// This address is used when the client doesn't find the address it
// should send the message to in the list of participant. In that case
// it packs the message in an enveloppe and send it back to this
// address, which is registered in the list of participant.
address: o.addr,
participants: []player{
// This participant is used to send back messages that must be
// relayed.
{
address: o.addr,
streamClient: stream,
},
},
name: "remote RPC",
address: o.addr,
participants: map[string]overlayStream{o.addr.String(): stream},
name: "remote RPC",
srv: o.srv,
}

receiver := receiver{
in: make(chan *OverlayMsg),
errs: make(chan error),
name: "remote RPC",
stop: make(chan interface{}),
srv: o.srv,
}
go func() {
for {
Expand All @@ -134,12 +156,12 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
receiver.in <- msg
}
}()

o.handlerRcvr[apiURI[0]] = receiver
err := handler.Stream(sender, receiver)
if err != nil {
return xerrors.Errorf("failed to call the stream handler: %v", err)
}

close(receiver.stop)
return nil

}
Loading

0 comments on commit 06f4a1b

Please sign in to comment.