Skip to content

Commit

Permalink
Merge 7f9adf9 into 4427520
Browse files Browse the repository at this point in the history
  • Loading branch information
nkcr committed Mar 27, 2020
2 parents 4427520 + 7f9adf9 commit 024e855
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 124 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ require (
github.com/rs/zerolog v1.18.0
github.com/stretchr/testify v1.5.1
go.dedis.ch/kyber/v3 v3.0.12
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
google.golang.org/grpc v1.27.1
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
)
4 changes: 2 additions & 2 deletions mino/minogrpc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ func (m Minogrpc) MakeRPC(name string, h mino.Handler) (mino.RPC, error) {
URI := fmt.Sprintf("%s/%s", m.namespace, name)
rpc := RPC{
handler: h,
srv: m.server,
srv: &m.server,
uri: URI,
}

m.server.handlers[URI] = h

return rpc, nil
return &rpc, nil
}
4 changes: 2 additions & 2 deletions mino/minogrpc/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func Test_MakeRPC(t *testing.T) {
rpc, err := minoGrpc.MakeRPC("name", handler)
require.NoError(t, err)

expectedRPC := RPC{
expectedRPC := &RPC{
handler: handler,
srv: minoGrpc.server,
srv: &minoGrpc.server,
uri: fmt.Sprintf("namespace/name"),
}

Expand Down
71 changes: 54 additions & 17 deletions mino/minogrpc/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package minogrpc

import (
context "context"
"crypto/tls"

"github.com/golang/protobuf/ptypes"
"go.dedis.ch/fabric"
Expand All @@ -17,7 +18,19 @@ import (
// Server.Handlers, which is updated each time the makeRPC function is called.
type overlayService struct {
handlers map[string]mino.Handler
addr address
// this one is used when we open a new connexion on an existing RPC stream.
// In that case we must use the already existing receiver for this RPC.
handlerRcvr map[string]receiver
// this is the address of the server. This address is used to provide
// insighful information in the traffic history, as it is used to form the
// addressID of the sender.
addr address
// This map is used to create a new stream connection if possible
neighbours map[string]Peer
// This certificate is used to create a new stream connection if possible
srvCert *tls.Certificate
// Used to record traffic activity
traffic *traffic
}

// Call is the implementation of the overlay.Call proto definition
Expand Down Expand Up @@ -89,35 +102,59 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
"header. Expected 1, found %d", len(apiURI))
}

rcvr, ok := o.handlerRcvr[apiURI[0]]
if ok {
go func() {
for {
msg, err := stream.Recv()
status, ok := status.FromError(err)
if ok && err != nil && status.Code() == codes.Canceled {
return
}
if err != nil {
fabric.Logger.Error().Msgf("failed to receive in overlay: %v", err)
rcvr.errs <- xerrors.Errorf("failed to receive in overlay: %v", err)
}
rcvr.in <- msg
}
}()
// we must wait on this signal before exiting in order to keep the
// stream open
<-o.handlerRcvr[apiURI[0]].stop
return nil
}

handler, ok := o.handlers[apiURI[0]]
if !ok {
return xerrors.Errorf("didn't find the '%s' handler in the map "+
"of handlers, did you register it?", apiURI[0])
}

rpcID := "server_" + o.addr.String()

// For the moment this sender can only receive messages to itself
// TODO: find a way to know the other nodes.
sender := sender{
sender := &sender{
// This address is used when the client doesn't find the address it
// should send the message to in the list of participant. In that case
// it packs the message in an enveloppe and send it back to this
// address, which is registered in the list of participant.
address: o.addr,
participants: []player{
// This participant is used to send back messages that must be
// relayed.
{
address: o.addr,
streamClient: stream,
},
},
name: "remote RPC",
// It is also used to indicate the "from" of the message in the case it
// doesn't relay but sends directly.
address: address{rpcID},
participants: map[string]overlayStream{rpcID: stream},
name: "remote RPC",
neighbours: o.neighbours,
srvCert: o.srvCert,
traffic: o.traffic,
}

receiver := receiver{
in: make(chan *OverlayMsg),
errs: make(chan error),
name: "remote RPC",
in: make(chan *OverlayMsg),
errs: make(chan error),
name: "remote RPC",
stop: make(chan interface{}),
traffic: o.traffic,
}
go func() {
for {
Expand All @@ -134,12 +171,12 @@ func (o overlayService) Stream(stream Overlay_StreamServer) error {
receiver.in <- msg
}
}()

o.handlerRcvr[apiURI[0]] = receiver
err := handler.Stream(sender, receiver)
if err != nil {
return xerrors.Errorf("failed to call the stream handler: %v", err)
}

close(receiver.stop)
return nil

}
4 changes: 4 additions & 0 deletions mino/minogrpc/overlay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func TestOverlay_Stream(t *testing.T) {

// Now I provide a handler
overlayService.handlers["handler_key"] = testFailHandler{}
overlayService.handlerRcvr = make(map[string]receiver)
overlayService.traffic = &traffic{}

// Now I set the right elements in the header but use a handler that should
// raise an error
Expand All @@ -124,6 +126,8 @@ func TestOverlay_Stream(t *testing.T) {

// Now we set our mock StreamServer to return an error on receive
streamServer.recvError = true
// We have to reset it because it's already used and then would deadlock
overlayService.handlerRcvr = make(map[string]receiver)
err = overlayService.Stream(&streamServer)
require.EqualError(t, err, "failed to call the stream handler: oops")
// We have to wait there so we catch the goroutine error
Expand Down
Loading

0 comments on commit 024e855

Please sign in to comment.