Skip to content

Commit

Permalink
fix(network): mdns + others discovery at launch
Browse files Browse the repository at this point in the history
Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Mar 8, 2019
1 parent 11bedd2 commit bdc03b4
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 28 deletions.
18 changes: 17 additions & 1 deletion core/network/driver.go
Expand Up @@ -16,6 +16,7 @@ import (
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
ipfsaddr "github.com/ipfs/go-ipfs-addr"
libp2p_discovery "github.com/libp2p/go-libp2p-discovery"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
Expand Down Expand Up @@ -152,6 +153,22 @@ func (net *Network) BootstrapPeer(ctx context.Context, bootstrapAddr string) err
return net.host.Connect(ctx, *pinfo)
}

func (net *Network) Discover(ctx context.Context) {
libp2p_discovery.Advertise(ctx, net.host.Discovery, "berty")
go func() {
for {
peers, err := libp2p_discovery.FindPeers(ctx, net.host.Discovery, "berty", 0)
if err != nil {
logger().Error("network discover error", zap.String("err", err.Error()))
continue
}
for _, pi := range peers {
net.Connect(ctx, pi)
}
}
}()
}

// Connect ensures there is a connection between this host and the peer with
// given peer.ID.
func (net *Network) Connect(ctx context.Context, pi pstore.PeerInfo) error {
Expand All @@ -173,7 +190,6 @@ func (net *Network) Connect(ctx context.Context, pi pstore.PeerInfo) error {
if err != nil {
return err
}

return net.host.Connect(ctx, pi)
}

Expand Down
15 changes: 8 additions & 7 deletions core/network/host/discovery.go
Expand Up @@ -7,6 +7,7 @@ import (
"berty.tech/core/pkg/tracing"
discovery "github.com/libp2p/go-libp2p-discovery"
pstore "github.com/libp2p/go-libp2p-peerstore"
"go.uber.org/zap"
)

var _ discovery.Discovery = (*BertyDiscovery)(nil)
Expand All @@ -29,19 +30,17 @@ func (d *BertyDiscovery) Advertise(ctx context.Context, ns string, opts ...disco
t := time.Now()

waitChans := []chan struct{}{}
for i := range waitChans {
waitChans[i] = make(chan struct{}, 1)
}

for i := range d.discoveries {
waitChans = append(waitChans, make(chan struct{}, 1))

d := d.discoveries[i]
waitChan := waitChans[i]

go func() {
_, err := d.Advertise(ctx, ns, opts...)
waitChan <- struct{}{}
if err != nil {
logger().Error(err.Error())
logger().Error("berty discovery advertise error", zap.String("err", err.Error()))
return
}
}()
Expand All @@ -66,13 +65,15 @@ func (d *BertyDiscovery) FindPeers(ctx context.Context, ns string, opts ...disco
go func() {
piChan, err := d.FindPeers(ctx, ns, opts...)
if err != nil {
logger().Error(err.Error())
logger().Error("berty discovery find peers error", zap.String("err", err.Error()))
return
}
for {
select {
case pi := <-piChan:
globPiChan <- pi
if pi.ID != "" {
globPiChan <- pi
}
case <-ctx.Done():
return
}
Expand Down
15 changes: 10 additions & 5 deletions core/network/network.go
Expand Up @@ -39,15 +39,15 @@ func New(ctx context.Context, opts ...config.Option) (*Network, error) {
var err error
var cfg config.Config

net := &Network{
shutdown: cancel,
}

if err := cfg.Apply(opts...); err != nil {
cancel()
return nil, err
}

net := &Network{
shutdown: cancel,
}

net.host, err = cfg.NewNode(ctx)
if err != nil {
cancel()
Expand All @@ -57,10 +57,15 @@ func New(ctx context.Context, opts ...config.Option) (*Network, error) {
net.host.SetStreamHandler(ProtocolID, net.handleEnvelope)
net.logHostInfos()

if net.Bootstrap(ctx, false, cfg.Bootstrap...); err != nil {
// bootstrap default peers
if err := net.Bootstrap(ctx, false, cfg.Bootstrap...); err != nil {
cancel()
return nil, err
}

// advertise and find peers on berty discovery service
net.Discover(ctx)

return net, nil
}

Expand Down
33 changes: 18 additions & 15 deletions core/network/protocol/mdns/discovery.go
Expand Up @@ -14,53 +14,56 @@ import (
var _ discovery.Discovery = (*Discovery)(nil)

type Discovery struct {
host host.Host
service service.Service
notifee *notifee
host host.Host
services map[string]service.Service
notifees map[string]*notifee
}

func NewDiscovery(ctx context.Context, host host.Host) (discovery.Discovery, error) {
return &Discovery{
host: host,
service: nil,
notifee: nil,
host: host,
services: map[string]service.Service{},
notifees: map[string]*notifee{},
}, nil
}

func (d *Discovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

if err := d.wakeService(ctx); err != nil {
if err := d.wakeService(ctx, ns); err != nil {
return 0, err
}
time.Sleep(10 * time.Second)
return 10 * time.Second, nil
}

func (d *Discovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan pstore.PeerInfo, error) {
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

if err := d.wakeService(ctx); err != nil {
if err := d.wakeService(ctx, ns); err != nil {
return nil, err
}
if d.notifee == nil {
d.notifee = &notifee{
_, ok := d.notifees[ns]
if !ok {
d.notifees[ns] = &notifee{
piChan: make(chan pstore.PeerInfo, 1),
}
d.service.RegisterNotifee(d.notifee)
d.services[ns].RegisterNotifee(d.notifees[ns])
}
return d.notifee.piChan, nil
return d.notifees[ns].piChan, nil
}

func (d *Discovery) wakeService(ctx context.Context) error {
func (d *Discovery) wakeService(ctx context.Context, ns string) error {
var err error

if d.service != nil {
_, ok := d.services[ns]
if ok {
return nil
}

d.service, err = service.NewMdnsService(ctx, d.host, 10*time.Second, "berty")
d.services[ns], err = service.NewMdnsService(ctx, d.host, 10*time.Second, ns)
if err != nil {
return err
}
Expand Down

0 comments on commit bdc03b4

Please sign in to comment.