Skip to content

Commit

Permalink
fix(relay): Fix relay conn
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Sep 13, 2018
1 parent a063215 commit 6af59c1
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 34 deletions.
6 changes: 4 additions & 2 deletions core/cmd/berty/daemon.go
Expand Up @@ -42,7 +42,7 @@ import (
)

var defaultBootstrap = []string{
"/ip4/167.99.215.90/tcp/4004/ipfs/Qme2FmUfPs6KtFo9W74ncKQKdrjW2Dq37tCqtEtZg4sp79",
"/ip4/104.248.78.238/tcp/4004/ipfs/QmPCbsVWDtLTdCtwfp5ftZ96xccUNe4hegKStgbss8YACT",
}

type daemonOptions struct {
Expand All @@ -52,6 +52,7 @@ type daemonOptions struct {
hideBanner bool
dropDatabase bool
initOnly bool
bindgql string

// p2p
identity string
Expand All @@ -70,6 +71,7 @@ func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) {
flags.BoolVar(&opts.hop, "hop", false, "enable relay hop (should not be enable for client)")
flags.BoolVar(&opts.mdns, "mdns", true, "enable mdns discovery")
flags.StringVarP(&opts.bind, "bind", "b", ":1337", "gRPC listening address")
flags.StringVarP(&opts.bindgql, "bind-graphql", "g", ":8700", "Bind graphql api")
flags.StringVarP(&opts.identity, "p2p-identity", "i", "", "set p2p identity")
flags.StringSliceVar(&opts.bootstrap, "bootstrap", defaultBootstrap, "boostrap peers")
flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{"/ip4/0.0.0.0/tcp/0"}, "p2p listening address")
Expand Down Expand Up @@ -255,7 +257,7 @@ func daemon(opts *daemonOptions) error {
}).Handler(mux)

go func() {
errChan <- http.ListenAndServe(":8700", handler)
errChan <- http.ListenAndServe(opts.bindgql, handler)
}()

// start grpc server(s)
Expand Down
44 changes: 22 additions & 22 deletions core/network/p2p/p2p.go
Expand Up @@ -287,33 +287,30 @@ func (d *Driver) EmitTo(ctx context.Context, channel string, e *p2p.Envelope) er
return fmt.Errorf("No subscribers found")
}

sendEnvelope := func(_s pstore.PeerInfo) {
peerID := _s.ID.Pretty()

if _s.ID.Pretty() == d.ID() {
return
}
for _, s := range ss {
go func(pi pstore.PeerInfo) {
peerID := pi.ID.Pretty()

if err := d.Connect(ctx, _s); err != nil {
logger().Warn("Failed to dial", zap.String("id", peerID), zap.Error(err))
}
if pi.ID.Pretty() == d.ID() {
return
}

c, err := d.ccmanager.GetConn(ctx, peerID)
if err != nil {
logger().Warn("Failed to dial", zap.String("id", peerID), zap.Error(err))
}
if err := d.Connect(ctx, pi); err != nil {
logger().Warn("Failed to connect", zap.String("id", peerID), zap.Error(err))
}

sc := p2p.NewServiceClient(c)
c, err := d.ccmanager.GetConn(ctx, peerID)
if err != nil {
logger().Warn("Failed to dial", zap.String("id", peerID), zap.Error(err))
}

_, err = sc.HandleEnvelope(ctx, e)
if err != nil {
logger().Warn("Failed to send envelope", zap.String("envelope", fmt.Sprintf("%+v", e)), zap.String("error", err.Error()))
}
}
sc := p2p.NewServiceClient(c)

for _, s := range ss {
_s := s
go sendEnvelope(_s)
_, err = sc.HandleEnvelope(ctx, e)
if err != nil {
logger().Error("Failed to send envelope", zap.String("envelope", fmt.Sprintf("%+v", e)), zap.String("error", err.Error()))
}
}(s)
}
return nil
}
Expand All @@ -325,6 +322,7 @@ func (d *Driver) Announce(ctx context.Context, id string) error {

// FindSubscribers with the given ID
func (d *Driver) FindSubscribers(ctx context.Context, id string) ([]pstore.PeerInfo, error) {
logger().Debug("Looking for", zap.String("id", id))
c, err := d.createCid(id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -352,6 +350,8 @@ func (d *Driver) Join(ctx context.Context, id string) error {
logger().Warn("Provide err", zap.Error(err))
}

logger().Debug("Annoucing", zap.String("id", id))

// Announce that you are subscribed to this conversation, but don't
// broadcast it! in this way, if you die, your announcement will die with you!
return nil
Expand Down
12 changes: 8 additions & 4 deletions core/network/p2p/p2putil/conn.go
Expand Up @@ -80,14 +80,18 @@ type conn struct {

// NewConnFromStream convert a inet.Stream to a net.conn
func NewConnFromStream(s inet.Stream) (net.Conn, error) {
localAddr, err := manet.ToNetAddr(s.Conn().LocalMultiaddr())
var localAddr, remoteAddr net.Addr
var err error

pid := s.Protocol()
localAddr, err = manet.ToNetAddr(s.Conn().LocalMultiaddr())
if err != nil {
return nil, err
localAddr = &ProtocolAddr{pid}
}

remoteAddr, err := manet.ToNetAddr(s.Conn().RemoteMultiaddr())
remoteAddr, err = manet.ToNetAddr(s.Conn().RemoteMultiaddr())
if err != nil {
return nil, err
localAddr = &ProtocolAddr{pid}
}

return &conn{s, localAddr, remoteAddr}, nil
Expand Down
7 changes: 1 addition & 6 deletions core/network/p2p/protocol/service/p2pgrpc/grpc.go
Expand Up @@ -78,11 +78,6 @@ func (pg *P2Pgrpc) NewDialer(proto string) func(string, time.Duration) (net.Conn
return nil, err
}

c, err := p2putil.NewConnFromStream(s)
if err != nil {
return nil, err
}

return c, nil
return p2putil.NewConnFromStream(s)
}
}

0 comments on commit 6af59c1

Please sign in to comment.