Skip to content

Commit

Permalink
Merge pull request #1149 from gponsinet/fix-rn-netconf-peercache
Browse files Browse the repository at this point in the history
fix: network config peer cache & mdns discover
  • Loading branch information
Godefroy Ponsinet committed Mar 21, 2019
2 parents b38bb62 + 13bc646 commit d7ee578
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 32 deletions.
Expand Up @@ -112,13 +112,24 @@ export default class Network extends PureComponent {
}
/>
</Menu.Section>
<Menu.Section title='Connections'>
<Menu.Item
title='Peer cache'
customRight={
<Switch
justify='end'
value={this.state.PeerCache}
onValueChange={PeerCache => this.updateConfig({ PeerCache })}
/>
}
/>
</Menu.Section>
<Menu.Section title='Bootstrap'>
<Menu.Item
title='Default bootstrap'
customRight={
<Switch
justify='end'
disaBLEd={!this.state.loaded}
value={this.state.DefaultBootstrap}
onValueChange={DefaultBootstrap =>
this.updateConfig({ DefaultBootstrap })
Expand Down
8 changes: 0 additions & 8 deletions core/network/config/config.go
Expand Up @@ -126,28 +126,24 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {

libp2pOpts := []libp2p_config.Option{}

logger().Debug("apply 0")
for _, opt := range opts {
if err := opt(cfg); err != nil {
return err
}
}

logger().Debug("apply 1")
if cfg.OverridePersist {
if err := cfg.OverridePersistConfig(); err != nil {
return err
}
}

logger().Debug("apply 2")
if cfg.Persist {
if err := cfg.ApplyPersistConfig(); err != nil {
return err
}
}

logger().Debug("apply 21")
// add ws transport
if cfg.WS {
libp2pOpts = append(libp2pOpts, libp2p.Transport(ws.New))
Expand Down Expand Up @@ -189,7 +185,6 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(circuit.OptActive, circuit.OptDiscovery))
}

logger().Debug("apply 22")
// private network
if cfg.SwarmKey != "" {
prot, err := pnet.NewProtector(strings.NewReader(cfg.SwarmKey))
Expand All @@ -199,7 +194,6 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(prot))
}

logger().Debug("apply 3")
// identity
if cfg.Identity != "" {
bytes, err := base64.StdEncoding.DecodeString(cfg.Identity)
Expand All @@ -221,7 +215,6 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {

libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())

logger().Debug("apply 4")
// override libp2p configuration
err := cfg.Config.Apply(append(libp2pOpts, libp2p.FallbackDefaults)...)
if err != nil {
Expand All @@ -230,7 +223,6 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {

// override conn manager

logger().Debug("apply 5")
// override ping service
cfg.Config.DisablePing = true
return nil
Expand Down
22 changes: 13 additions & 9 deletions core/network/driver.go
Expand Up @@ -163,14 +163,20 @@ func (net *Network) Discover(ctx context.Context) {
if net.host.Discovery != nil {
libp2p_discovery.Advertise(ctx, net.host.Discovery, "berty")
go func() {
peers, err := net.host.Discovery.FindPeers(ctx, "berty")
if err != nil {
logger().Debug("network discover: cannot find peers: " + err.Error())
return
}
for {
peers, err := libp2p_discovery.FindPeers(ctx, net.host.Discovery, "berty", 0)
if err != nil {
logger().Error("network discover error", zap.String("err", err.Error()))
continue
}
for _, pi := range peers {
net.Connect(ctx, pi)
select {
case pi := <-peers:
if err := net.Connect(ctx, pi); err != nil {
logger().Error("network discover: failed to connect: " + err.Error())
}
case <-ctx.Done():
logger().Debug("network discover shutdown")
return
}
}
}()
Expand Down Expand Up @@ -261,7 +267,6 @@ func (net *Network) SendTo(ctx context.Context, pi pstore.PeerInfo, e *entity.En
}

func (net *Network) handleEnvelope(s inet.Stream) {
logger().Debug(fmt.Sprintf("NETWORK ADDR HANDLE %p %+v", net, net))
logger().Debug("receiving envelope")
if net.handler == nil {
logger().Error("handler is not set")
Expand Down Expand Up @@ -319,7 +324,6 @@ func (net *Network) Join(ctx context.Context, contactID string) error {
}

func (net *Network) OnEnvelopeHandler(f func(context.Context, *entity.Envelope) (*entity.Void, error)) {
logger().Debug(fmt.Sprintf("ON_ENVELOPE_HANDLER %p", f))
net.handler = f
}

Expand Down
6 changes: 4 additions & 2 deletions core/network/host/discovery.go
Expand Up @@ -3,6 +3,7 @@ package host
import (
"context"
"errors"
"fmt"
"time"

"berty.tech/core/pkg/tracing"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (d *BertyDiscovery) Advertise(ctx context.Context, ns string, opts ...disco
waitChan := waitChans[i]

go func() {
_, err := d.Advertise(ctx, ns, opts...)
_, err := d.Advertise(ctx, ns)
waitChan <- struct{}{}
if err != nil {
logger().Error("berty discovery advertise error", zap.String("err", err.Error()))
Expand Down Expand Up @@ -79,14 +80,15 @@ func (d *BertyDiscovery) FindPeers(ctx context.Context, ns string, opts ...disco
d := d.discoveries[i]

go func() {
piChan, err := d.FindPeers(ctx, ns, opts...)
piChan, err := d.FindPeers(ctx, ns)
if err != nil {
logger().Error("berty discovery find peers error", zap.String("err", err.Error()))
return
}
for {
select {
case pi := <-piChan:
logger().Debug("berty discovery peer found " + fmt.Sprintf("%+v", pi))
if pi.ID != "" {
globPiChan <- pi
}
Expand Down
2 changes: 2 additions & 0 deletions core/network/network.go
Expand Up @@ -80,6 +80,8 @@ func (net *Network) init(ctx context.Context) {
if err := net.Bootstrap(ctx, false, net.config.Bootstrap...); err != nil {
logger().Error(err.Error())
}

net.Discover(ctx)
}

func (net *Network) Close(ctx context.Context) error {
Expand Down
13 changes: 4 additions & 9 deletions core/network/protocol/mdns/discovery.go
Expand Up @@ -33,7 +33,7 @@ func (d *Discovery) Advertise(ctx context.Context, ns string, opts ...discovery.
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

if err := d.wakeService(ctx, ns, false); err != nil {
if err := d.wakeService(ctx, ns); err != nil {
return 0, err
}
time.Sleep(10 * time.Second)
Expand All @@ -44,14 +44,14 @@ func (d *Discovery) FindPeers(ctx context.Context, ns string, opts ...discovery.
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

if err := d.wakeService(ctx, ns, true); err != nil {
if err := d.wakeService(ctx, ns); err != nil {
return nil, err
}

return d.notifees[ns].piChan, nil
}

func (d *Discovery) wakeService(ctx context.Context, ns string, regiterNotifee bool) error {
func (d *Discovery) wakeService(ctx context.Context, ns string) error {
var err error

d.mutex.Lock()
Expand All @@ -70,11 +70,7 @@ func (d *Discovery) wakeService(ctx context.Context, ns string, regiterNotifee b
d.notifees[ns] = &notifee{
piChan: make(chan pstore.PeerInfo, 1),
}
}

if regiterNotifee && d.notifees[ns].registered == false {
d.services[ns].RegisterNotifee(d.notifees[ns])
d.notifees[ns].registered = true
}
d.mutex.Unlock()

Expand All @@ -84,8 +80,7 @@ func (d *Discovery) wakeService(ctx context.Context, ns string, regiterNotifee b
var _ service.Notifee = (*notifee)(nil)

type notifee struct {
piChan chan pstore.PeerInfo
registered bool
piChan chan pstore.PeerInfo
}

func (n *notifee) HandlePeerFound(pi pstore.PeerInfo) {
Expand Down
3 changes: 0 additions & 3 deletions core/node/network.go
Expand Up @@ -2,7 +2,6 @@ package node

import (
"context"
"fmt"

"berty.tech/core/network"
network_metric "berty.tech/core/network/metric"
Expand Down Expand Up @@ -39,8 +38,6 @@ func (n *Node) UseNetworkDriver(ctx context.Context, driver network.Driver) erro
// configure network
n.networkDriver.OnEnvelopeHandler(n.HandleEnvelope)

logger().Debug(fmt.Sprintf("NETWORK ADDR NODE %p %+v", n.networkDriver, n.networkDriver))

_ = n.networkDriver.Join(ctx, n.UserID())

// FIXME: subscribe to every owned device IDs
Expand Down

0 comments on commit d7ee578

Please sign in to comment.