Skip to content

Commit

Permalink
fix(network): fix rebase on master
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Mar 14, 2019
1 parent fdf69bc commit 3a49548
Show file tree
Hide file tree
Showing 26 changed files with 527 additions and 1,660 deletions.
2 changes: 2 additions & 0 deletions client/react-native/gomobile/go.sum
Expand Up @@ -507,6 +507,8 @@ github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
github.com/multiformats/go-multiaddr-dns v0.0.2 h1:/Bbsgsy3R6e3jf2qBahzNHzww6usYaZ0NhNH3sqdFS8=
github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
Expand Down
1 change: 1 addition & 0 deletions core/api/client/jsonclient/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/api/client/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 16 additions & 21 deletions core/cmd/berty/daemon.go
Expand Up @@ -36,19 +36,18 @@ type daemonOptions struct {

// p2p

identity string `mapstructure:"identity"`
bootstrap []string `mapstructure:"bootstrap"`
noP2P bool `mapstructure:"no-p2p"`
bindP2P []string `mapstructure:"bind-p2p"`
transportP2P []string `mapstructure:"transport-p2p"`
hop bool `mapstructure:"hop"` // relay hop
ble bool `mapstructure:"ble"`
mdns bool `mapstructure:"mdns"`
dhtServer bool `mapstructure:"dht"`
PrivateNetwork bool `mapstructure:"private-network"`
SwarmKeyPath string `mapstructure:"swarm-key"`
dhtkvLogDatastore bool `mapstructure:"dhtkv-log-ds"`
nickname string `mapstructure:"nickname"`
identity string `mapstructure:"identity"`
bootstrap []string `mapstructure:"bootstrap"`
noP2P bool `mapstructure:"no-p2p"`
bindP2P []string `mapstructure:"bind-p2p"`
transportP2P []string `mapstructure:"transport-p2p"`
hop bool `mapstructure:"hop"` // relay hop
ble bool `mapstructure:"ble"`
mdns bool `mapstructure:"mdns"`
dhtServer bool `mapstructure:"dht"`
PrivateNetwork bool `mapstructure:"private-network"`
SwarmKeyPath string `mapstructure:"swarm-key"`
nickname string `mapstructure:"nickname"`
}

func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) {
Expand All @@ -60,19 +59,15 @@ func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) {
flags.BoolVar(&opts.initOnly, "init-only", false, "stop after node initialization (useful for integration tests")
flags.StringVar(&opts.privateKeyFile, "private-key-file", "", "set private key file for node")
flags.BoolVar(&opts.withBot, "bot", false, "enable bot")
flags.BoolVar(&opts.mdns, "mdns", true, "enable mdns discovery")
flags.BoolVar(&opts.dhtServer, "dhtkv-server", false, "enable server mode for DHT (Client mode by default)")
flags.BoolVar(&opts.dhtkvLogDatastore, "dhtkv-log-ds", false, "enable logs for DHT-CSKV datastore")
flags.BoolVar(&opts.PrivateNetwork, "private-network", true, "enable private network with the default swarm key")
flags.StringVar(&opts.SwarmKeyPath, "swarm-key", "", "path to a custom swarm key, only peers that use the same swarm key will be able to talk with you")
flags.StringVar(&opts.grpcBind, "grpc-bind", ":1337", "gRPC listening address")
flags.StringVar(&opts.gqlBind, "gql-bind", ":8700", "Bind graphql api")

// network
flags.StringVarP(&opts.identity, "p2p-identity", "i", "", "set p2p identity")
flags.StringSliceVar(&opts.bootstrap, "bootstrap", network_config.DefaultBootstrap, "boostrap peers")
flags.StringSliceVar(&opts.apnsCerts, "apns-certs", []string{}, "Path of APNs certificates, delimited by commas")
flags.StringSliceVar(&opts.apnsDevVoipCerts, "apns-dev-voip-certs", []string{}, "Path of APNs VoIP development certificates, delimited by commas")
flags.StringSliceVar(&opts.fcmAPIKeys, "fcm-api-keys", []string{}, "API keys for Firebase Cloud Messaging, in the form packageid:token, delimited by commas")
// network
flags.StringSliceVar(&opts.bootstrap, "bootstrap", network_config.DefaultBootstrap, "boostrap peers")
flags.BoolVar(&opts.noP2P, "no-p2p", false, "Disable p2p Driver")
flags.BoolVar(&opts.hop, "hop", false, "enable relay hop (should not be enable for client)")
flags.BoolVar(&opts.mdns, "mdns", true, "enable mdns discovery")
Expand Down Expand Up @@ -173,7 +168,7 @@ func daemon(opts *daemonOptions) error {
Persist: false,
OverridePersist: false,

DHTKVLogDatastore: opts.dhtkvLogDatastore,
// DHTKVLogDatastore: opts.dhtkvLogDatastore,
}),
),
))
Expand Down
1 change: 1 addition & 0 deletions core/go.mod
Expand Up @@ -54,6 +54,7 @@ require (
github.com/libp2p/go-libp2p-protocol v0.0.1
github.com/libp2p/go-libp2p-pubsub v0.0.1 // indirect
github.com/libp2p/go-libp2p-quic-transport v0.0.1
github.com/libp2p/go-libp2p-record v0.0.1
github.com/libp2p/go-libp2p-routing v0.0.1
github.com/libp2p/go-libp2p-swarm v0.0.1
github.com/libp2p/go-libp2p-transport v0.0.4
Expand Down
15 changes: 14 additions & 1 deletion core/network/config/config.go
Expand Up @@ -15,6 +15,7 @@ import (
"berty.tech/core/network/host"
"berty.tech/core/network/metric"
"berty.tech/core/network/protocol/ble"
"berty.tech/core/network/protocol/dht"
"berty.tech/core/network/protocol/mdns"
"berty.tech/core/pkg/errorcodes"
libp2p "github.com/libp2p/go-libp2p"
Expand Down Expand Up @@ -137,6 +138,12 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
}
}

if cfg.DefaultBootstrap {
cfg.Bootstrap = append(cfg.Bootstrap, DefaultBootstrap...)
}

logger().Debug(fmt.Sprintf("bootstrap: %+v", cfg.Bootstrap))

// add ws transport
if cfg.WS {
libp2pOpts = append(libp2pOpts, libp2p.Transport(ws.New))
Expand Down Expand Up @@ -310,8 +317,14 @@ func (cfg *Config) NewNode(ctx context.Context) (*host.BertyHost, error) {
return nil, err
}

dht, err := dht.New(ctx, h, cfg.DHTServer, false)
if err != nil {
h.Close()
return nil, err
}

// Configure routing
h.Routing, err = host.NewBertyRouting(ctx, h, cfg.DHTServer)
h.Routing, err = host.NewBertyRouting(ctx, h, dht)
if err != nil {
h.Close()
return nil, err
Expand Down
90 changes: 0 additions & 90 deletions core/network/dhtcskv/dht_cskv.go

This file was deleted.

21 changes: 0 additions & 21 deletions core/network/dhtcskv/logger.gen.go

This file was deleted.

95 changes: 33 additions & 62 deletions core/network/driver.go
Expand Up @@ -2,17 +2,18 @@ package network

import (
"context"
"errors"
"fmt"
"io"
"net"
"reflect"
"sync"
"time"

"berty.tech/core/entity"
"berty.tech/core/network/config"
"berty.tech/core/network/helper"
"berty.tech/core/network/metric"
routing_validator "berty.tech/core/network/protocol/dht/validator"
"berty.tech/core/pkg/tracing"
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
Expand All @@ -24,6 +25,7 @@ import (
protocol "github.com/libp2p/go-libp2p-protocol"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -208,42 +210,21 @@ func (net *Network) Emit(ctx context.Context, e *entity.Envelope) error {
return net.EmitTo(ctx, e.GetChannelID(), e)
}

func (net *Network) EmitTo(ctx context.Context, channel string, e *entity.Envelope) error {
tracer := tracing.EnterFunc(ctx, channel, e)
func (net *Network) EmitTo(ctx context.Context, contactID string, e *entity.Envelope) error {
tracer := tracing.EnterFunc(ctx, contactID, e)
defer tracer.Finish()
ctx = tracer.Context()

logger().Debug("looking for peers", zap.String("channel", channel))
c, err := net.createCid(channel)
peerInfo, err := routing_validator.ContactIDToPeerInfo(ctx, net.host.Routing, contactID)
if err != nil {
return err
return errors.Wrap(err, fmt.Sprintf("EmitTo failed during contactID translation (%s)", contactID))
}

ss := net.host.Routing.FindProvidersAsync(ctx, c, 100)

// @TODO: we need to split this, and let the node do the logic to try
// back if the send fail with the given peer

logger().Debug("found peers", zap.String("channel", channel), zap.Int("number", len(ss)))
ok := false
for pi := range ss {
if pi.ID == "" {
break
}
logger().Debug(fmt.Sprintf("send to peer: %+v", pi))

if err := net.SendTo(ctx, pi, e); err != nil {
logger().Warn("sendTo", zap.Error(err))
continue
}

ok = true
break
}

// wait until all goroutines are done
if !ok {
return fmt.Errorf("unable to send evenlope to at last one peer")
if err = net.SendTo(ctx, peerInfo, e); err != nil {
logger().Warn("sendTo", zap.Error(err))
return err
}

return nil
Expand Down Expand Up @@ -297,43 +278,33 @@ func (net *Network) handleEnvelope(s inet.Stream) {

}

func (net *Network) FindProvidersAndWait(ctx context.Context, id string, cache bool) ([]pstore.PeerInfo, error) {
c, err := net.createCid(id)
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()

piChan := net.host.Routing.FindProvidersAsync(ctx, c, 10)

piSlice := []pstore.PeerInfo{}
for {
select {
case pi := <-piChan:
if pi.ID == "" {
return piSlice, nil
func (net *Network) Join(ctx context.Context, contactID string) error {
go func() {
prevPeerInfo := pstore.PeerInfo{}
for {
duration := 1 * time.Minute
currPeerInfo := net.host.Peerstore().PeerInfo(net.host.ID())
if !reflect.DeepEqual(prevPeerInfo, currPeerInfo) {
err := routing_validator.PutTranslateRecord(ctx, net.host.Routing, contactID, currPeerInfo)
if err != nil {
logger().Warn(errors.Wrap(err, "join failed").Error())
duration = 5 * time.Second
} else {
logger().Debug("translate record updated successfully")
prevPeerInfo = currPeerInfo
}
}

piSlice = append(piSlice, pi)
case <-ctx.Done():
if len(piSlice) == 0 {
return nil, errors.New("no providers found")
select {
case <-time.After(duration):
continue
case <-ctx.Done():
logger().Debug("driver shutdown: translation record updater ended", zap.Error(ctx.Err()))
return
}
return piSlice, nil
}
}
}()

}

func (net *Network) Join(ctx context.Context, id string) error {
c, err := net.createCid(id)
if err != nil {
return err
}

return net.host.Routing.Provide(ctx, c, true)
return nil
}

func (net *Network) OnEnvelopeHandler(f func(context.Context, *entity.Envelope) (*entity.Void, error)) {
Expand Down

0 comments on commit 3a49548

Please sign in to comment.