Skip to content

Commit

Permalink
fix(p2p): Remove grpc handler
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Jan 29, 2019
1 parent f62467d commit 45e6590
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 161 deletions.
194 changes: 80 additions & 114 deletions core/network/p2p/driver.go
Expand Up @@ -3,25 +3,22 @@ package p2p
import (
"context"
"fmt"
"io"
"net"
"sync"
"time"

"github.com/libp2p/go-libp2p/p2p/protocol/ping"

"berty.tech/core/pkg/errorcodes"

"berty.tech/core/api/p2p"
"berty.tech/core/network"
"berty.tech/core/network/p2p/p2putil"
"berty.tech/core/network/p2p/protocol/provider"
"berty.tech/core/network/p2p/protocol/service/p2pgrpc"
"berty.tech/core/pkg/tracing"

provider_pubsub "berty.tech/core/network/p2p/protocol/provider/pubsub"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
ggio "github.com/gogo/protobuf/io"
grpc_ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
Expand All @@ -39,12 +36,11 @@ import (
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const ID = "api/p2p/methods"

var ProtocolID = protocol.ID(p2pgrpc.GetGrpcID(ID))
var ProtocolID = protocol.ID("berty/p2p/envelope")

// driverConfig configure the driver
type driverConfig struct {
Expand All @@ -67,18 +63,14 @@ type driverConfig struct {
var _ network.Driver = (*Driver)(nil)

type Driver struct {
host host.Host
ccmanager *p2putil.Manager
handler func(context.Context, *p2p.Envelope) (*p2p.Void, error)
host host.Host
handler func(context.Context, *p2p.Envelope) (*p2p.Void, error)

providers *provider.Manager

// services
dht *dht.IpfsDHT

listener net.Listener
gs *grpc.Server

rootContext context.Context
PingSvc *ping.PingService
}
Expand Down Expand Up @@ -150,53 +142,9 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) {

host.Network().Notify(Notify(ctx, driver))

var (
serverStreamOpts = []grpc.StreamServerInterceptor{
grpc_ctxtags.StreamServerInterceptor(),
grpc_zap.StreamServerInterceptor(logger()),
}
serverUnaryOpts = []grpc.UnaryServerInterceptor{
grpc_ctxtags.UnaryServerInterceptor(),
grpc_zap.UnaryServerInterceptor(logger()),
}
clientStreamOpts = []grpc.StreamClientInterceptor{
grpc_zap.StreamClientInterceptor(logger()),
}
clientUnaryOpts = []grpc.UnaryClientInterceptor{
grpc_zap.UnaryClientInterceptor(logger()),
}
)

if cfg.jaeger != nil {
serverStreamOpts = append(serverStreamOpts, grpc_ot.StreamServerInterceptor(cfg.jaeger...))
serverUnaryOpts = append(serverUnaryOpts, grpc_ot.UnaryServerInterceptor(cfg.jaeger...))
clientStreamOpts = append(clientStreamOpts, grpc_ot.StreamClientInterceptor(cfg.jaeger...))
clientUnaryOpts = append(clientUnaryOpts, grpc_ot.UnaryClientInterceptor(cfg.jaeger...))
}

p2pInterceptorsServer := []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(serverStreamOpts...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(serverUnaryOpts...)),
}

p2pInterceptorsClient := []grpc.DialOption{
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(clientStreamOpts...)),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(clientUnaryOpts...)),
}

driver.gs = grpc.NewServer(p2pInterceptorsServer...)
sgrpc := p2pgrpc.NewP2PGrpcService(host)
driver.PingSvc = ping.NewPingService(host)

dialOpts := append([]grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDialer(sgrpc.NewDialer(ctx, ID)),
}, p2pInterceptorsClient...)
driver.ccmanager = p2putil.NewNetManager(dialOpts...)

p2p.RegisterServiceServer(driver.gs, ServiceServer(driver))

driver.listener = sgrpc.NewListener(ctx, ID)
host.SetStreamHandler(ProtocolID, driver.handleEnvelope)

// driver.providers.Register(provider_dht.New(driver.host, driver.dht))
pubsubProvider, err := provider_pubsub.New(ctx, host)
Expand All @@ -220,10 +168,6 @@ func (d *Driver) Start(ctx context.Context) error {
defer tracer.Finish()
// ctx = tracer.Context()

if err := d.gs.Serve(d.listener); err != nil {
logger().Error("Listen error", zap.Error(err))
return err
}
return nil
}

Expand Down Expand Up @@ -311,10 +255,6 @@ func (d *Driver) Close(ctx context.Context) error {
}
}

if d.listener != nil {
d.listener.Close()
}

return nil
}

Expand Down Expand Up @@ -451,66 +391,93 @@ func (d *Driver) EmitTo(ctx context.Context, channel string, e *p2p.Envelope) er
return err
}

// @TODO: we need to split this, and let the node do the logic to try
// back if the send fail with the given peer

logger().Debug("found peers", zap.String("channel", channel), zap.Int("number", len(ss)))
success := make([]chan bool, len(ss))

mu := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(ss))

ok := false
for i, s := range ss {
success[i] = make(chan bool, 1)
go func(pi pstore.PeerInfo, index int, done chan bool) {
go func(pi pstore.PeerInfo, index int) {
gotracer := tracing.EnterFunc(ctx, index)
defer gotracer.Finish()
goctx := tracer.Context()

peerID := pi.ID.Pretty()
if pi.ID == d.host.ID() {
logger().Warn("cannot dial to self", zap.String("id", peerID), zap.Error(err))
done <- false
return
}
defer gotracer.Finish()
defer wg.Done()

logger().Debug("connecting", zap.String("channel", channel), zap.String("peerID", peerID))
if err := d.Connect(goctx, pi); err != nil {
logger().Warn("failed to connect", zap.String("id", peerID), zap.Error(err))
done <- false
if err := d.SendTo(goctx, pi, e); err != nil {
logger().Warn("sendTo", zap.Error(err))
return
}

c, err := d.ccmanager.GetConn(goctx, peerID)
if err != nil {
logger().Warn("failed to dial", zap.String("id", peerID), zap.Error(err))
done <- false
return
}
mu.Lock()
ok = true
mu.Unlock()
return
}(s, i)
}

sc := p2p.NewServiceClient(c)
// wait until all goroutines are done
wg.Wait()
if !ok {
return fmt.Errorf("unable to send evenlope to at last one peer")
}

logger().Debug("sending envelope", zap.String("channel", channel), zap.String("peerID", peerID))
_, err = sc.HandleEnvelope(goctx, e)
if err != nil {
logger().Error("failed to send envelope", zap.String("channel", channel), zap.String("peerID", peerID), zap.String("error", err.Error()))
done <- false
return
}
return nil
}

done <- true
}(s, i, success[i])
func (d *Driver) SendTo(ctx context.Context, pi pstore.PeerInfo, e *p2p.Envelope) error {
peerID := pi.ID.Pretty()
if pi.ID == d.host.ID() {
return fmt.Errorf("cannot dial to self")
}

ok := false
for _, cc := range success {
if ok = <-cc; ok {
break
}
logger().Debug("connecting", zap.String("peerID", peerID))
if err := d.Connect(ctx, pi); err != nil {
return fmt.Errorf("failed to connect: `%s`", err.Error())
}

if !ok {
return fmt.Errorf("failed to emit envelope")
s, err := d.host.NewStream(ctx, pi.ID, ProtocolID)
if err != nil {
return fmt.Errorf("new stream failed: `%s`", err.Error())
}

logger().Debug("sending envelope", zap.String("peerID", peerID))
pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(e); err != nil {
return fmt.Errorf("write stream: `%s`", err.Error())
}

return nil
}

func (d *Driver) GetConn(ctx context.Context, peerID string) (*grpc.ClientConn, error) {
return d.ccmanager.GetConn(ctx, peerID)
func (d *Driver) handleEnvelope(s inet.Stream) {
logger().Debug("receiving envelope")

if d.handler == nil {
logger().Error("handler is not set")
return
}

e := &p2p.Envelope{}
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
switch err := pbr.ReadMsg(e); err {
case io.EOF:
s.Close()
return
case nil: // do noting, everything fine
default:
s.Reset()
logger().Error("invalid envelope", zap.Error(err))
return
}

// @TODO: get opentracing context
d.handler(context.Background(), e)
}

func (d *Driver) FindProvidersAndWait(ctx context.Context, id string, cache bool) ([]pstore.PeerInfo, error) {
Expand Down Expand Up @@ -539,18 +506,17 @@ func (d *Driver) OnEnvelopeHandler(f func(context.Context, *p2p.Envelope) (*p2p.
d.handler = f
}

func (d *Driver) PingOtherNode(ctx context.Context, destination string) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

c, err := d.ccmanager.GetConn(ctx, destination)
func (d *Driver) PingOtherNode(ctx context.Context, destination string) (err error) {
peerid, err := peer.IDB58Decode(destination)
if err != nil {
return errorcodes.ErrNetPing.Wrap(err)
return err
}

if _, err = p2p.NewServiceClient(c).Ping(ctx, &p2p.Void{}); err != nil {
return errorcodes.ErrNetPing.Wrap(err)
ch, err := d.PingSvc.Ping(ctx, peerid)
if err != nil {
return err
}

<-ch
return nil
}

0 comments on commit 45e6590

Please sign in to comment.