From 6af59c114092af7ae0849b18e3c90eea86024b24 Mon Sep 17 00:00:00 2001 From: Guilhem Fanton Date: Thu, 13 Sep 2018 18:58:50 +0200 Subject: [PATCH] fix(relay): Fix relay conn --- core/cmd/berty/daemon.go | 6 ++- core/network/p2p/p2p.go | 44 +++++++++---------- core/network/p2p/p2putil/conn.go | 12 +++-- .../p2p/protocol/service/p2pgrpc/grpc.go | 7 +-- 4 files changed, 35 insertions(+), 34 deletions(-) diff --git a/core/cmd/berty/daemon.go b/core/cmd/berty/daemon.go index 780d2eb51f..4191c39181 100644 --- a/core/cmd/berty/daemon.go +++ b/core/cmd/berty/daemon.go @@ -42,7 +42,7 @@ import ( ) var defaultBootstrap = []string{ - "/ip4/167.99.215.90/tcp/4004/ipfs/Qme2FmUfPs6KtFo9W74ncKQKdrjW2Dq37tCqtEtZg4sp79", + "/ip4/104.248.78.238/tcp/4004/ipfs/QmPCbsVWDtLTdCtwfp5ftZ96xccUNe4hegKStgbss8YACT", } type daemonOptions struct { @@ -52,6 +52,7 @@ type daemonOptions struct { hideBanner bool dropDatabase bool initOnly bool + bindgql string // p2p identity string @@ -70,6 +71,7 @@ func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) { flags.BoolVar(&opts.hop, "hop", false, "enable relay hop (should not be enable for client)") flags.BoolVar(&opts.mdns, "mdns", true, "enable mdns discovery") flags.StringVarP(&opts.bind, "bind", "b", ":1337", "gRPC listening address") + flags.StringVarP(&opts.bindgql, "bind-graphql", "g", ":8700", "Bind graphql api") flags.StringVarP(&opts.identity, "p2p-identity", "i", "", "set p2p identity") flags.StringSliceVar(&opts.bootstrap, "bootstrap", defaultBootstrap, "boostrap peers") flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{"/ip4/0.0.0.0/tcp/0"}, "p2p listening address") @@ -255,7 +257,7 @@ func daemon(opts *daemonOptions) error { }).Handler(mux) go func() { - errChan <- http.ListenAndServe(":8700", handler) + errChan <- http.ListenAndServe(opts.bindgql, handler) }() // start grpc server(s) diff --git a/core/network/p2p/p2p.go b/core/network/p2p/p2p.go index 9817765d48..4239ecc50e 100644 --- a/core/network/p2p/p2p.go +++ b/core/network/p2p/p2p.go @@ -287,33 +287,30 @@ func (d *Driver) EmitTo(ctx context.Context, channel string, e *p2p.Envelope) er return fmt.Errorf("No subscribers found") } - sendEnvelope := func(_s pstore.PeerInfo) { - peerID := _s.ID.Pretty() - - if _s.ID.Pretty() == d.ID() { - return - } + for _, s := range ss { + go func(pi pstore.PeerInfo) { + peerID := pi.ID.Pretty() - if err := d.Connect(ctx, _s); err != nil { - logger().Warn("Failed to dial", zap.String("id", peerID), zap.Error(err)) - } + if pi.ID.Pretty() == d.ID() { + return + } - c, err := d.ccmanager.GetConn(ctx, peerID) - if err != nil { - logger().Warn("Failed to dial", zap.String("id", peerID), zap.Error(err)) - } + if err := d.Connect(ctx, pi); err != nil { + logger().Warn("Failed to connect", zap.String("id", peerID), zap.Error(err)) + } - sc := p2p.NewServiceClient(c) + c, err := d.ccmanager.GetConn(ctx, peerID) + if err != nil { + logger().Warn("Failed to dial", zap.String("id", peerID), zap.Error(err)) + } - _, err = sc.HandleEnvelope(ctx, e) - if err != nil { - logger().Warn("Failed to send envelope", zap.String("envelope", fmt.Sprintf("%+v", e)), zap.String("error", err.Error())) - } - } + sc := p2p.NewServiceClient(c) - for _, s := range ss { - _s := s - go sendEnvelope(_s) + _, err = sc.HandleEnvelope(ctx, e) + if err != nil { + logger().Error("Failed to send envelope", zap.String("envelope", fmt.Sprintf("%+v", e)), zap.String("error", err.Error())) + } + }(s) } return nil } @@ -325,6 +322,7 @@ func (d *Driver) Announce(ctx context.Context, id string) error { // FindSubscribers with the given ID func (d *Driver) FindSubscribers(ctx context.Context, id string) ([]pstore.PeerInfo, error) { + logger().Debug("Looking for", zap.String("id", id)) c, err := d.createCid(id) if err != nil { return nil, err @@ -352,6 +350,8 @@ func (d *Driver) Join(ctx context.Context, id string) error { logger().Warn("Provide err", zap.Error(err)) } + logger().Debug("Annoucing", zap.String("id", id)) + // Announce that you are subscribed to this conversation, but don't // broadcast it! in this way, if you die, your announcement will die with you! return nil diff --git a/core/network/p2p/p2putil/conn.go b/core/network/p2p/p2putil/conn.go index b8ad564ef6..9426eb8c1c 100644 --- a/core/network/p2p/p2putil/conn.go +++ b/core/network/p2p/p2putil/conn.go @@ -80,14 +80,18 @@ type conn struct { // NewConnFromStream convert a inet.Stream to a net.conn func NewConnFromStream(s inet.Stream) (net.Conn, error) { - localAddr, err := manet.ToNetAddr(s.Conn().LocalMultiaddr()) + var localAddr, remoteAddr net.Addr + var err error + + pid := s.Protocol() + localAddr, err = manet.ToNetAddr(s.Conn().LocalMultiaddr()) if err != nil { - return nil, err + localAddr = &ProtocolAddr{pid} } - remoteAddr, err := manet.ToNetAddr(s.Conn().RemoteMultiaddr()) + remoteAddr, err = manet.ToNetAddr(s.Conn().RemoteMultiaddr()) if err != nil { - return nil, err + localAddr = &ProtocolAddr{pid} } return &conn{s, localAddr, remoteAddr}, nil diff --git a/core/network/p2p/protocol/service/p2pgrpc/grpc.go b/core/network/p2p/protocol/service/p2pgrpc/grpc.go index 62238bb2ac..9dd6c718d0 100644 --- a/core/network/p2p/protocol/service/p2pgrpc/grpc.go +++ b/core/network/p2p/protocol/service/p2pgrpc/grpc.go @@ -78,11 +78,6 @@ func (pg *P2Pgrpc) NewDialer(proto string) func(string, time.Duration) (net.Conn return nil, err } - c, err := p2putil.NewConnFromStream(s) - if err != nil { - return nil, err - } - - return c, nil + return p2putil.NewConnFromStream(s) } }