Skip to content

Commit

Permalink
fix(network): handler sometimes not set
Browse files Browse the repository at this point in the history
Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Mar 20, 2019
1 parent 2582ba4 commit f845d8e
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 41 deletions.
2 changes: 1 addition & 1 deletion client/react-native/desktop/Makefile
Expand Up @@ -4,7 +4,7 @@ deps:
GOPROXY=https://goproxy.berty.io GO111MODULE=on go mod vendor
GO111MODULE=off go get -u github.com/asticode/go-astilectron-bundler/...

build:
build: clean
GOPROXY=https://goproxy.berty.io GO111MODULE=on go mod vendor
sed s%TMPL_MAKEFILE_PATH%$(MAKEFILE_DIR)%g bundler.json.tmpl > bundler.json
cp -rf $(MAKEFILE_DIR)/../web/build $(MAKEFILE_DIR)/../desktop/resources/app
Expand Down
11 changes: 11 additions & 0 deletions core/network/config/config.go
Expand Up @@ -121,26 +121,33 @@ func (cfg *Config) Override(override *Config) error {
// Apply applies the given options to the config, returning the first error
// encountered (if any).
func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
// reset config
cfg.Config = libp2p_config.Config{}

libp2pOpts := []libp2p_config.Option{}

logger().Debug("apply 0")
for _, opt := range opts {
if err := opt(cfg); err != nil {
return err
}
}

logger().Debug("apply 1")
if cfg.OverridePersist {
if err := cfg.OverridePersistConfig(); err != nil {
return err
}
}

logger().Debug("apply 2")
if cfg.Persist {
if err := cfg.ApplyPersistConfig(); err != nil {
return err
}
}

logger().Debug("apply 21")
// add ws transport
if cfg.WS {
libp2pOpts = append(libp2pOpts, libp2p.Transport(ws.New))
Expand Down Expand Up @@ -182,6 +189,7 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(circuit.OptActive, circuit.OptDiscovery))
}

logger().Debug("apply 22")
// private network
if cfg.SwarmKey != "" {
prot, err := pnet.NewProtector(strings.NewReader(cfg.SwarmKey))
Expand All @@ -191,6 +199,7 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(prot))
}

logger().Debug("apply 3")
// identity
if cfg.Identity != "" {
bytes, err := base64.StdEncoding.DecodeString(cfg.Identity)
Expand All @@ -212,6 +221,7 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {

libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())

logger().Debug("apply 4")
// override libp2p configuration
err := cfg.Config.Apply(append(libp2pOpts, libp2p.FallbackDefaults)...)
if err != nil {
Expand All @@ -220,6 +230,7 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {

// override conn manager

logger().Debug("apply 5")
// override ping service
cfg.Config.DisablePing = true
return nil
Expand Down
3 changes: 3 additions & 0 deletions core/network/driver.go
Expand Up @@ -261,6 +261,8 @@ func (net *Network) SendTo(ctx context.Context, pi pstore.PeerInfo, e *entity.En
}

func (net *Network) handleEnvelope(s inet.Stream) {
logger().Debug(fmt.Sprintf("NETWORK ADDR HANDLE %p %+v", net, net))
logger().Debug("receiving envelope")
if net.handler == nil {
logger().Error("handler is not set")
return
Expand Down Expand Up @@ -317,6 +319,7 @@ func (net *Network) Join(ctx context.Context, contactID string) error {
}

func (net *Network) OnEnvelopeHandler(f func(context.Context, *entity.Envelope) (*entity.Void, error)) {
logger().Debug(fmt.Sprintf("ON_ENVELOPE_HANDLER %p", f))
net.handler = f
}

Expand Down
69 changes: 29 additions & 40 deletions core/network/network.go
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"fmt"
"sync"

"berty.tech/core/entity"
Expand Down Expand Up @@ -41,35 +42,49 @@ func New(ctx context.Context, opts ...config.Option) (*Network, error) {
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

ctx, cancel := context.WithCancel(ctx)
net := &Network{}

if err := net.new(ctx, opts...); err != nil {
return nil, err
}
return net, nil
}

func (net *Network) new(ctx context.Context, opts ...config.Option) error {
var err error

net := &Network{
ctx, cancel := context.WithCancel(ctx)

new := &Network{
config: &config.Config{},
updating: &sync.Mutex{},
shutdown: cancel,
cache: NewNoopCache(),
}

if err := net.config.Apply(ctx, opts...); err != nil {
if net.updating != nil {
new.updating = net.updating
}

if err := new.config.Apply(ctx, opts...); err != nil {
cancel()
return nil, err
return err
}

net.host, err = net.config.NewNode(ctx)
new.host, err = new.config.NewNode(ctx)
if err != nil {
cancel()
return nil, err
return err
}

if net.Config().PeerCache {
net.cache = NewPeerCache(net.host)
}
*net = *new

net.init(ctx)

return net, nil
if net.Config().PeerCache {
net.cache = NewPeerCache(net.host)
}
return nil
}

func (net *Network) init(ctx context.Context) {
Expand All @@ -88,46 +103,21 @@ func (net *Network) Update(ctx context.Context, opts ...config.Option) error {
net.updating.Lock()
defer net.updating.Unlock()

ctx, cancel := context.WithCancel(ctx)

var err error
swap := *net

update := &Network{
config: &config.Config{},
updating: net.updating,
handler: net.handler,
shutdown: cancel,
cache: NewNoopCache(),
}

if err := update.config.Apply(ctx, append([]config.Option{WithConfig(net.config)}, opts...)...); err != nil {
cancel()
if err := net.new(ctx, append([]config.Option{WithConfig(net.config)}, opts...)...); err != nil {
return err
}

update.host, err = update.config.NewNode(ctx)
if err != nil {
cancel()
return err
}

if update.Config().PeerCache {
net.cache = NewPeerCache(update.host)
}

net.Close(ctx)

*net = *update

net.init(ctx)
logger().Debug(fmt.Sprintf("NETWORK ADDR %p %+v, swap: %p %+v", net, net, &swap, swap))
swap.Close(ctx)

return nil
}

func (net *Network) Close(ctx context.Context) error {
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

net.shutdown()

// FIXME: save cache to speedup next connections
Expand All @@ -140,6 +130,5 @@ func (net *Network) Close(ctx context.Context) error {
logger().Error("p2p close error", zap.Error(err))
}
}

return nil
}
3 changes: 3 additions & 0 deletions core/node/network.go
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"fmt"

"berty.tech/core/network"
network_metric "berty.tech/core/network/metric"
Expand Down Expand Up @@ -38,6 +39,8 @@ func (n *Node) UseNetworkDriver(ctx context.Context, driver network.Driver) erro
// configure network
n.networkDriver.OnEnvelopeHandler(n.HandleEnvelope)

logger().Debug(fmt.Sprintf("NETWORK ADDR NODE %p %+v", n.networkDriver, n.networkDriver))

_ = n.networkDriver.Join(ctx, n.UserID())

// FIXME: subscribe to every owned device IDs
Expand Down

0 comments on commit f845d8e

Please sign in to comment.