From 45e6590ed3d79981fbb7c772873056e3877c3c57 Mon Sep 17 00:00:00 2001 From: Guilhem Fanton Date: Mon, 28 Jan 2019 19:12:37 +0100 Subject: [PATCH] fix(p2p): Remove grpc handler --- core/network/p2p/driver.go | 194 ++++++++---------- .../network/p2p/protocol/provider/provider.go | 60 ++++-- .../p2p/protocol/provider/pubsub/pubsub.go | 50 ++--- .../protocol/provider/pubsub/pubsub_test.go | 2 +- core/network/p2p/test/p2p_test.go | 9 +- 5 files changed, 154 insertions(+), 161 deletions(-) diff --git a/core/network/p2p/driver.go b/core/network/p2p/driver.go index 7ee451be36..c34061ad52 100644 --- a/core/network/p2p/driver.go +++ b/core/network/p2p/driver.go @@ -3,25 +3,22 @@ package p2p import ( "context" "fmt" + "io" "net" + "sync" "time" "github.com/libp2p/go-libp2p/p2p/protocol/ping" - "berty.tech/core/pkg/errorcodes" - "berty.tech/core/api/p2p" "berty.tech/core/network" "berty.tech/core/network/p2p/p2putil" "berty.tech/core/network/p2p/protocol/provider" - "berty.tech/core/network/p2p/protocol/service/p2pgrpc" "berty.tech/core/pkg/tracing" provider_pubsub "berty.tech/core/network/p2p/protocol/provider/pubsub" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" - grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + ggio "github.com/gogo/protobuf/io" grpc_ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" @@ -39,12 +36,11 @@ import ( ma "github.com/multiformats/go-multiaddr" mh "github.com/multiformats/go-multihash" "go.uber.org/zap" - "google.golang.org/grpc" ) const ID = "api/p2p/methods" -var ProtocolID = protocol.ID(p2pgrpc.GetGrpcID(ID)) +var ProtocolID = protocol.ID("berty/p2p/envelope") // driverConfig configure the driver type driverConfig struct { @@ -67,18 +63,14 @@ type driverConfig struct { var _ network.Driver = (*Driver)(nil) type Driver struct { - host host.Host - ccmanager *p2putil.Manager - handler func(context.Context, *p2p.Envelope) (*p2p.Void, error) + host host.Host + handler func(context.Context, *p2p.Envelope) (*p2p.Void, error) providers *provider.Manager // services dht *dht.IpfsDHT - listener net.Listener - gs *grpc.Server - rootContext context.Context PingSvc *ping.PingService } @@ -150,53 +142,9 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) { host.Network().Notify(Notify(ctx, driver)) - var ( - serverStreamOpts = []grpc.StreamServerInterceptor{ - grpc_ctxtags.StreamServerInterceptor(), - grpc_zap.StreamServerInterceptor(logger()), - } - serverUnaryOpts = []grpc.UnaryServerInterceptor{ - grpc_ctxtags.UnaryServerInterceptor(), - grpc_zap.UnaryServerInterceptor(logger()), - } - clientStreamOpts = []grpc.StreamClientInterceptor{ - grpc_zap.StreamClientInterceptor(logger()), - } - clientUnaryOpts = []grpc.UnaryClientInterceptor{ - grpc_zap.UnaryClientInterceptor(logger()), - } - ) - - if cfg.jaeger != nil { - serverStreamOpts = append(serverStreamOpts, grpc_ot.StreamServerInterceptor(cfg.jaeger...)) - serverUnaryOpts = append(serverUnaryOpts, grpc_ot.UnaryServerInterceptor(cfg.jaeger...)) - clientStreamOpts = append(clientStreamOpts, grpc_ot.StreamClientInterceptor(cfg.jaeger...)) - clientUnaryOpts = append(clientUnaryOpts, grpc_ot.UnaryClientInterceptor(cfg.jaeger...)) - } - - p2pInterceptorsServer := []grpc.ServerOption{ - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(serverStreamOpts...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(serverUnaryOpts...)), - } - - p2pInterceptorsClient := []grpc.DialOption{ - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(clientStreamOpts...)), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(clientUnaryOpts...)), - } - - driver.gs = grpc.NewServer(p2pInterceptorsServer...) - sgrpc := p2pgrpc.NewP2PGrpcService(host) driver.PingSvc = ping.NewPingService(host) - dialOpts := append([]grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithDialer(sgrpc.NewDialer(ctx, ID)), - }, p2pInterceptorsClient...) - driver.ccmanager = p2putil.NewNetManager(dialOpts...) - - p2p.RegisterServiceServer(driver.gs, ServiceServer(driver)) - - driver.listener = sgrpc.NewListener(ctx, ID) + host.SetStreamHandler(ProtocolID, driver.handleEnvelope) // driver.providers.Register(provider_dht.New(driver.host, driver.dht)) pubsubProvider, err := provider_pubsub.New(ctx, host) @@ -220,10 +168,6 @@ func (d *Driver) Start(ctx context.Context) error { defer tracer.Finish() // ctx = tracer.Context() - if err := d.gs.Serve(d.listener); err != nil { - logger().Error("Listen error", zap.Error(err)) - return err - } return nil } @@ -311,10 +255,6 @@ func (d *Driver) Close(ctx context.Context) error { } } - if d.listener != nil { - d.listener.Close() - } - return nil } @@ -451,66 +391,93 @@ func (d *Driver) EmitTo(ctx context.Context, channel string, e *p2p.Envelope) er return err } + // @TODO: we need to split this, and let the node do the logic to try + // back if the send fail with the given peer + logger().Debug("found peers", zap.String("channel", channel), zap.Int("number", len(ss))) - success := make([]chan bool, len(ss)) + + mu := sync.Mutex{} + wg := sync.WaitGroup{} + wg.Add(len(ss)) + + ok := false for i, s := range ss { - success[i] = make(chan bool, 1) - go func(pi pstore.PeerInfo, index int, done chan bool) { + go func(pi pstore.PeerInfo, index int) { gotracer := tracing.EnterFunc(ctx, index) - defer gotracer.Finish() goctx := tracer.Context() - peerID := pi.ID.Pretty() - if pi.ID == d.host.ID() { - logger().Warn("cannot dial to self", zap.String("id", peerID), zap.Error(err)) - done <- false - return - } + defer gotracer.Finish() + defer wg.Done() - logger().Debug("connecting", zap.String("channel", channel), zap.String("peerID", peerID)) - if err := d.Connect(goctx, pi); err != nil { - logger().Warn("failed to connect", zap.String("id", peerID), zap.Error(err)) - done <- false + if err := d.SendTo(goctx, pi, e); err != nil { + logger().Warn("sendTo", zap.Error(err)) return } - c, err := d.ccmanager.GetConn(goctx, peerID) - if err != nil { - logger().Warn("failed to dial", zap.String("id", peerID), zap.Error(err)) - done <- false - return - } + mu.Lock() + ok = true + mu.Unlock() + return + }(s, i) + } - sc := p2p.NewServiceClient(c) + // wait until all goroutines are done + wg.Wait() + if !ok { + return fmt.Errorf("unable to send evenlope to at last one peer") + } - logger().Debug("sending envelope", zap.String("channel", channel), zap.String("peerID", peerID)) - _, err = sc.HandleEnvelope(goctx, e) - if err != nil { - logger().Error("failed to send envelope", zap.String("channel", channel), zap.String("peerID", peerID), zap.String("error", err.Error())) - done <- false - return - } + return nil +} - done <- true - }(s, i, success[i]) +func (d *Driver) SendTo(ctx context.Context, pi pstore.PeerInfo, e *p2p.Envelope) error { + peerID := pi.ID.Pretty() + if pi.ID == d.host.ID() { + return fmt.Errorf("cannot dial to self") } - ok := false - for _, cc := range success { - if ok = <-cc; ok { - break - } + logger().Debug("connecting", zap.String("peerID", peerID)) + if err := d.Connect(ctx, pi); err != nil { + return fmt.Errorf("failed to connect: `%s`", err.Error()) } - if !ok { - return fmt.Errorf("failed to emit envelope") + s, err := d.host.NewStream(ctx, pi.ID, ProtocolID) + if err != nil { + return fmt.Errorf("new stream failed: `%s`", err.Error()) + } + + logger().Debug("sending envelope", zap.String("peerID", peerID)) + pbw := ggio.NewDelimitedWriter(s) + if err := pbw.WriteMsg(e); err != nil { + return fmt.Errorf("write stream: `%s`", err.Error()) } return nil } -func (d *Driver) GetConn(ctx context.Context, peerID string) (*grpc.ClientConn, error) { - return d.ccmanager.GetConn(ctx, peerID) +func (d *Driver) handleEnvelope(s inet.Stream) { + logger().Debug("receiving envelope") + + if d.handler == nil { + logger().Error("handler is not set") + return + } + + e := &p2p.Envelope{} + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + switch err := pbr.ReadMsg(e); err { + case io.EOF: + s.Close() + return + case nil: // do noting, everything fine + default: + s.Reset() + logger().Error("invalid envelope", zap.Error(err)) + return + } + + // @TODO: get opentracing context + d.handler(context.Background(), e) } func (d *Driver) FindProvidersAndWait(ctx context.Context, id string, cache bool) ([]pstore.PeerInfo, error) { @@ -539,18 +506,17 @@ func (d *Driver) OnEnvelopeHandler(f func(context.Context, *p2p.Envelope) (*p2p. d.handler = f } -func (d *Driver) PingOtherNode(ctx context.Context, destination string) error { - ctx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - - c, err := d.ccmanager.GetConn(ctx, destination) +func (d *Driver) PingOtherNode(ctx context.Context, destination string) (err error) { + peerid, err := peer.IDB58Decode(destination) if err != nil { - return errorcodes.ErrNetPing.Wrap(err) + return err } - if _, err = p2p.NewServiceClient(c).Ping(ctx, &p2p.Void{}); err != nil { - return errorcodes.ErrNetPing.Wrap(err) + ch, err := d.PingSvc.Ping(ctx, peerid) + if err != nil { + return err } + <-ch return nil } diff --git a/core/network/p2p/protocol/provider/provider.go b/core/network/p2p/protocol/provider/provider.go index 954d694fed..3b81a37aee 100644 --- a/core/network/p2p/protocol/provider/provider.go +++ b/core/network/p2p/protocol/provider/provider.go @@ -45,10 +45,6 @@ func (m *Manager) addPeerToSub(id cid.Cid, pi pstore.PeerInfo) error { defer m.muSubs.Unlock() logger().Debug("registering", zap.String("id", id.String())) - for k, v := range m.subs { - logger().Debug("key, value", zap.String("k", k.String()), zap.Int("v", len(v))) - } - ps, ok := m.subs[id] if !ok { return fmt.Errorf("not subscribed to %s", id) @@ -149,15 +145,30 @@ func (m *Manager) GetLocalPeers(id cid.Cid) (Peers, error) { func (m *Manager) Provide(ctx context.Context, id cid.Cid) error { logger().Debug("providing", zap.String("id", id.String())) + mu := sync.Mutex{} + wg := sync.WaitGroup{} + wg.Add(len(m.providers)) + ok := false for _, p := range m.providers { - if err := p.Provide(ctx, id); err != nil { - logger().Warn("failed to provide", zap.String("id", id.String()), zap.Error(err)) - } else { + go func(_p Provider) { + defer wg.Done() + + if err := _p.Provide(ctx, id); err != nil { + logger().Warn("failed to provide", zap.String("id", id.String()), zap.Error(err)) + return + } + + mu.Lock() ok = true - } + mu.Unlock() + + return + }(p) } + // wait until all goroutines are done + wg.Wait() if !ok { return fmt.Errorf("failed to provide with at last on provider") } @@ -166,29 +177,46 @@ func (m *Manager) Provide(ctx context.Context, id cid.Cid) error { } func (m *Manager) FindProviders(ctx context.Context, id cid.Cid, cache bool) error { - logger().Debug("finding providers", zap.String("id", id.String())) - - // create subscription if cache == true { ps, err := m.getPeersForSub(id) if err == nil && len(ps) > 0 { + // We already got some peers, + logger().Debug("getting cached providers", zap.String("id", id.String())) return nil } } + logger().Debug("finding providers", zap.String("id", id.String())) + + // create subscription if err := m.createSub(id); err != nil { - logger().Warn("provider subscription", zap.Error(err)) + logger().Warn("finding providers", zap.Error(err)) } + mu := sync.Mutex{} + wg := sync.WaitGroup{} + wg.Add(len(m.providers)) + ok := false for _, p := range m.providers { - if err := p.FindProviders(ctx, id); err != nil { - logger().Warn("finding providers", zap.String("id", id.String()), zap.Error(err)) - } else { + go func(_p Provider) { + defer wg.Done() + + if err := _p.FindProviders(ctx, id); err != nil { + logger().Warn("finding providers", zap.String("id", id.String()), zap.Error(err)) + return + } + + mu.Lock() ok = true - } + mu.Unlock() + + return + }(p) } + // wait until all goroutines are done + wg.Wait() if !ok { return fmt.Errorf("failed to find providers with at last one provider") } diff --git a/core/network/p2p/protocol/provider/pubsub/pubsub.go b/core/network/p2p/protocol/provider/pubsub/pubsub.go index 3fa3cd80ea..47d0843118 100644 --- a/core/network/p2p/protocol/provider/pubsub/pubsub.go +++ b/core/network/p2p/protocol/provider/pubsub/pubsub.go @@ -75,7 +75,7 @@ func (p *Provider) RegisterHandler(h provider.Handler) { p.handler = h } -func (p *Provider) getSub() (*p2pps.Subscription, error) { +func (p *Provider) getSub(ctx context.Context) (*p2pps.Subscription, error) { p.subReady.Lock() defer p.subReady.Unlock() @@ -86,7 +86,12 @@ func (p *Provider) getSub() (*p2pps.Subscription, error) { } // wait for heartbeats to build mesh - <-time.After(time.Second * 2) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(time.Second * 2): + + } p.sub = sub } @@ -103,8 +108,8 @@ func (p *Provider) cancelSub() { } } -func (p *Provider) isReady() error { - _, err := p.getSub() +func (p *Provider) isReady(ctx context.Context) error { + _, err := p.getSub(ctx) return err } @@ -207,14 +212,7 @@ func (p *Provider) handleStream(s inet.Stream) { func (p *Provider) handleSubscription(ctx context.Context) error { for { - - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - sub, err := p.getSub() + sub, err := p.getSub(ctx) if err != nil { logger().Warn("sub error", zap.Error(err)) continue @@ -238,11 +236,6 @@ func (p *Provider) handleSubscription(ctx context.Context) error { logger().Warn("decode id error", zap.String("id", id.String()), zap.Error(err)) } - if !p.isSubscribeTo(remoteProvider.GetId()) { - logger().Debug("not subscribed", zap.String("ID", remoteProvider.GetId())) - continue - } - pinfo := pstore.PeerInfo{} data := remoteProvider.GetPeerInfo() if err := pinfo.UnmarshalJSON(data); err != nil { @@ -254,6 +247,11 @@ func (p *Provider) handleSubscription(ctx context.Context) error { continue } + if !p.isSubscribeTo(remoteProvider.GetId()) { + logger().Debug("not subscribed", zap.String("ID", remoteProvider.GetId())) + continue + } + logger().Debug("subscription got a new message", zap.String("subID", id.String()), zap.String("peerID", pinfo.ID.Pretty())) @@ -293,7 +291,7 @@ func (p *Provider) handleSubscription(ctx context.Context) error { } } -func (p *Provider) Announce(id string) error { +func (p *Provider) Announce(ctx context.Context, id string) error { pi, err := p.newProviderInfo(id) if err != nil { return err @@ -304,7 +302,7 @@ func (p *Provider) Announce(id string) error { return err } - if err := p.isReady(); err != nil { + if err := p.isReady(ctx); err != nil { return fmt.Errorf("announce failed %s", err) } @@ -313,7 +311,7 @@ func (p *Provider) Announce(id string) error { } func (p *Provider) Subscribe(ctx context.Context, id string) error { - if err := p.isReady(); err != nil { + if err := p.isReady(ctx); err != nil { return fmt.Errorf("subscribe failed %s", err) } @@ -322,9 +320,15 @@ func (p *Provider) Subscribe(ctx context.Context, id string) error { } func (p *Provider) Provide(ctx context.Context, id cid.Cid) error { - return p.Subscribe(ctx, id.String()) + // Do not return an error here, it's ok to provide the same id multiple + // time. But keep logging to track if this method become over called + if err := p.Subscribe(ctx, id.String()); err != nil { + logger().Warn("subscribing", zap.String("subID", id.String()), zap.Error(err)) + } + + return nil } -func (p *Provider) FindProviders(_ context.Context, id cid.Cid) error { - return p.Announce(id.String()) +func (p *Provider) FindProviders(ctx context.Context, id cid.Cid) error { + return p.Announce(ctx, id.String()) } diff --git a/core/network/p2p/protocol/provider/pubsub/pubsub_test.go b/core/network/p2p/protocol/provider/pubsub/pubsub_test.go index 609f1f6d78..2a31cd7405 100644 --- a/core/network/p2p/protocol/provider/pubsub/pubsub_test.go +++ b/core/network/p2p/protocol/provider/pubsub/pubsub_test.go @@ -201,7 +201,7 @@ func TestP2PNetwork(t *testing.T) { err = lisa.provider.Subscribe(ctx, topic) So(err, ShouldBeNil) - err = roger.provider.Announce(topic) + err = roger.provider.Announce(ctx, topic) So(err, ShouldBeNil) var ps []pstore.PeerInfo diff --git a/core/network/p2p/test/p2p_test.go b/core/network/p2p/test/p2p_test.go index f798cced20..2f7ec3e526 100644 --- a/core/network/p2p/test/p2p_test.go +++ b/core/network/p2p/test/p2p_test.go @@ -47,11 +47,7 @@ func setupDriver(bootstrap ...string) (*p2p.Driver, error) { if err != nil { return nil, err } - go func() { - if err = driver.Start(context.Background()); err != nil { - logger().Error("driver start error", zap.Error(err)) - } - }() + return driver, err } @@ -75,11 +71,10 @@ func TestP2PNetwork(t *testing.T) { homer, lisa, bart *p2p.Driver err error ) + // setupTestLogging() // log.SetDebugLogging() - // logging.SetDebugLogging() - dht.PoolSize = 3 ds := []*p2p.Driver{homer, lisa, bart} defer func() {