Skip to content

Commit

Permalink
feat(p2p): Add mdns discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Aug 16, 2018
1 parent 0874e78 commit e1ac52a
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 32 deletions.
63 changes: 44 additions & 19 deletions core/cmd/berty/daemon.go
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"

dht "github.com/libp2p/go-libp2p-kad-dht"
reuse "github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
"github.com/spf13/cobra"
Expand All @@ -27,16 +27,21 @@ import (
"github.com/berty/berty/core/node"
"github.com/berty/berty/core/sql"
"github.com/berty/berty/core/sql/sqlcipher"
p2plog "github.com/ipfs/go-log"
)

type daemonOptions struct {
bind string
hideBanner bool
sqlPath string
sqlKey string
dropDatabase bool
initOnly bool
noP2P bool
bind string
hideBanner bool
sqlPath string
sqlKey string
bootstrap []string
dropDatabase bool
initOnly bool
noP2P bool
logP2PLevel string
bindP2P []string
logP2PSubsytem []string
}

func newDaemonCommand() *cobra.Command {
Expand All @@ -47,14 +52,20 @@ func newDaemonCommand() *cobra.Command {
return daemon(opts)
},
}

flags := cmd.Flags()
flags.BoolVar(&opts.dropDatabase, "drop-database", false, "drop database to force a reinitialization")
flags.BoolVar(&opts.hideBanner, "hide-banner", false, "hide banner")
flags.BoolVar(&opts.initOnly, "init-only", false, "stop after node initialization (useful for integration tests")
flags.BoolVar(&opts.noP2P, "no-p2p", false, "Disable p2p Drvier")
flags.StringVarP(&opts.bind, "bind", "", "0.0.0.0:1337", "gRPC listening address")
flags.BoolVar(&opts.noP2P, "no-p2p", false, "Disable p2p Drier")
flags.StringVarP(&opts.bind, "bind", "b", ":1337", "gRPC listening address")
flags.StringVarP(&opts.sqlKey, "sql-key", "", "s3cur3", "sqlcipher database encryption key")
flags.StringVarP(&opts.sqlPath, "sql-path", "", "/tmp/berty.db", "sqlcipher database path")
flags.StringSliceVarP(&opts.bootstrap, "bootstrap", "", []string{}, "boostrap peers")
flags.StringSliceVarP(&opts.bindP2P, "bind-p2p", "", []string{"/ip4/0.0.0.0/tcp/0"}, "p2p listening address")
flags.StringVarP(&opts.logP2PLevel, "log-p2p-level", "", "", "Enable log on libp2p (can be 'critical', 'error', 'warning', 'notice', 'info', 'debug')")
flags.StringSliceVarP(&opts.logP2PSubsytem, "log-p2p-subsystem", "", []string{"*"}, "log libp2p specific subsystem")

return cmd
}

Expand All @@ -64,7 +75,18 @@ func daemon(opts *daemonOptions) error {
// initialize gRPC
gs := grpc.NewServer()
reflection.Register(gs)
listener, err := reuse.Listen("tcp", opts.bind)

addr, err := net.ResolveTCPAddr("tcp", opts.bind)
if err != nil {
return err
}

if addr.IP == nil {
addr.IP = net.IP{127, 0, 0, 1}
}

fmt.Printf("%s - %s:%d\n", addr.Network(), addr.IP.String(), addr.Port)
listener, err := reuse.Listen(addr.Network(), fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port))
if err != nil {
return err
}
Expand Down Expand Up @@ -92,21 +114,24 @@ func daemon(opts *daemonOptions) error {

var driver network.Driver
if !opts.noP2P {
var bootstrapConfig = &dht.BootstrapConfig{
Queries: 5,
Period: time.Second,
Timeout: 10 * time.Second,
if opts.logP2PLevel != "" {
for _, name := range opts.logP2PSubsytem {
if err := p2plog.SetLogLevel(name, strings.ToUpper(opts.logP2PLevel)); err != nil {
return err
}
}
}

driver, err = p2p.NewDriver(
context.Background(),
p2p.WithRandomIdentity(),
p2p.WithDefaultMuxers(),
p2p.WithDefaultPeerstore(),
p2p.WithDefaultSecurity(),
p2p.WithDefaultTransports(),
p2p.WithDHTBoostrapConfig(bootstrapConfig),
p2p.WithListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
p2p.WithBootstrap(p2p.BootstrapIpfs...),
p2p.WithMDNS(),
p2p.WithListenAddrStrings(opts.bindP2P...),
p2p.WithBootstrap(opts.bootstrap...),
)
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions core/network/p2p/options.go
Expand Up @@ -29,6 +29,14 @@ func (c *driverConfig) Apply(opts ...Option) error {
return nil
}

// WithDHTOptions creates a new DHT with the specified options.
func WithMDNS() Option {
return func(dc *driverConfig) error {
dc.enableMDNS = true
return nil
}
}

// WithDHTOptions creates a new DHT with the specified options.
func WithDHTOptions(opts ...dhtopt.Option) Option {
return func(dc *driverConfig) error {
Expand Down
106 changes: 93 additions & 13 deletions core/network/p2p/p2p.go
Expand Up @@ -3,6 +3,8 @@ package p2p
import (
"context"
"fmt"
"sync"
"time"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
Expand All @@ -14,6 +16,7 @@ import (
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
mdns "github.com/libp2p/go-libp2p/p2p/discovery"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
"go.uber.org/zap"
Expand All @@ -37,6 +40,9 @@ type driverConfig struct {
// DHT Service
dhtOpts []dhtopt.Option
dhtBoostrapConfig dht.BootstrapConfig

// MDNS
enableMDNS bool
}

// Driver is a network.Driver
Expand All @@ -49,6 +55,9 @@ type Driver struct {
ccmanager *p2putil.Manager
handler func(context.Context, *p2p.Event) (*p2p.Void, error)

subsStack []*cid.Cid
muSubs sync.Mutex

// services
dht *dht.IpfsDHT
}
Expand All @@ -64,6 +73,12 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) {
host: host,
}

if len(cfg.bootstrap) > 0 {
if err := driver.Bootstrap(ctx, cfg.bootstrapSync, cfg.bootstrap...); err != nil {
return nil, err
}
}

if len(cfg.dhtOpts) == 0 {
ds := datastore.NewMapDatastore()
cfg.dhtOpts = []dhtopt.Option{dhtopt.Datastore(ds)}
Expand All @@ -87,9 +102,13 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) {
return nil, err
}

if len(cfg.bootstrap) > 0 {
if err := driver.Bootstrap(ctx, cfg.bootstrapSync, cfg.bootstrap...); err != nil {
return nil, err
fmt.Println("test 3")
if cfg.enableMDNS {
sa, err := mdns.NewMdnsService(ctx, host, time.Second, "berty")
if err != nil {
zap.L().Warn("Failed to enable MDNS", zap.Error(err))
} else {
sa.RegisterNotifee((*DriverDiscoveryNotifee)(driver))
}
}

Expand All @@ -114,6 +133,8 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) {
}
}()

zap.L().Debug("Host", zap.String("ID", driver.ID()), zap.Strings("Addrs", driver.Addrs()))

return driver, nil
}

Expand All @@ -126,6 +147,20 @@ func NewDriver(ctx context.Context, opts ...Option) (*Driver, error) {
return newDriver(ctx, cfg)
}

func (d *Driver) ID() string {
return d.host.ID().Pretty()
}

func (d *Driver) Addrs() []string {
var addrs []string

for _, addr := range d.host.Addrs() {
addrs = append(addrs, addr.String())
}

return addrs
}

func (d *Driver) getPeerInfo(addr string) (*pstore.PeerInfo, error) {
iaddr, err := ipfsaddr.ParseString(addr)
if err != nil {
Expand All @@ -144,14 +179,6 @@ func (d *Driver) Close() error {
return d.host.Close()
}

func (d *Driver) Addrs() []ma.Multiaddr {
return d.host.Addrs()
}

func (d *Driver) ID() peer.ID {
return d.host.ID()
}

func (d *Driver) Peerstore() pstore.Peerstore {
return d.host.Peerstore()
}
Expand Down Expand Up @@ -265,7 +292,7 @@ func (d *Driver) SendEventToSubscribers(ctx context.Context, id string, e *p2p.E
sendEvent := func(_s pstore.PeerInfo) {
peerID := _s.ID.Pretty()

if _s.ID.Pretty() == d.ID().Pretty() {
if _s.ID.Pretty() == d.ID() {
return
}

Expand Down Expand Up @@ -308,16 +335,28 @@ func (d *Driver) FindSubscribers(ctx context.Context, id string) ([]pstore.PeerI
return d.dht.FindProviders(ctx, c)
}

func (d *Driver) stackSub(c *cid.Cid) {
d.muSubs.Lock()
d.subsStack = append(d.subsStack, c)
d.muSubs.Unlock()
}

// SubscribeTo to the given ID
func (d *Driver) SubscribeTo(ctx context.Context, id string) error {
c, err := d.createCid(id)
if err != nil {
return err
}

if err := d.dht.Provide(ctx, c, true); err != nil {
// stack peer if no peer found
d.stackSub(c)
zap.L().Warn("Provide err", zap.Error(err))
}

// Announce that you are subscribed to this conversation, but don't
// broadcast it! in this way, if you die, your announcement will die with you!
return d.dht.Provide(ctx, c, true)
return nil
}

func (d *Driver) SetReceiveEventHandler(f func(context.Context, *p2p.Event) (*p2p.Void, error)) {
Expand All @@ -333,3 +372,44 @@ func (ds *DriverService) Handle(ctx context.Context, e *p2p.Event) (*p2p.Void, e

return nil, fmt.Errorf("No handler set")
}

type DriverDiscoveryNotifee Driver

func (ddn *DriverDiscoveryNotifee) HandlePeerFound(pi pstore.PeerInfo) {
if err := ddn.host.Connect(context.Background(), pi); err != nil {
zap.L().Warn("mdns discovery failed", zap.String("remoteID", pi.ID.Pretty()), zap.Error(err))
} else {
// absorb addresses into peerstore
ddn.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.PermanentAddrTTL)
}
}

func (ddn *DriverDiscoveryNotifee) Driver() *Driver {
return (*Driver)(ddn)
}

func (ddn *DriverDiscoveryNotifee) Listen(net inet.Network, a ma.Multiaddr) {}
func (ddn *DriverDiscoveryNotifee) ListenClose(net inet.Network, a ma.Multiaddr) {}
func (ddn *DriverDiscoveryNotifee) OpenedStream(net inet.Network, s inet.Stream) {}
func (ddn *DriverDiscoveryNotifee) ClosedStream(net inet.Network, s inet.Stream) {}

func (ddn *DriverDiscoveryNotifee) Connected(s inet.Network, c inet.Conn) {
go func(id peer.ID) {
if len(ddn.subsStack) > 0 {
var newSubsStack []*cid.Cid
for _, c := range ddn.subsStack {
if err := ddn.dht.Provide(context.Background(), c, true); err != nil {
// stack peer if no peer found
zap.L().Warn("Provide err", zap.Error(err))
newSubsStack = append(newSubsStack, c)
}
}

ddn.muSubs.Lock()
ddn.subsStack = newSubsStack
ddn.muSubs.Unlock()
}
}(c.RemotePeer())
}

func (ddn *DriverDiscoveryNotifee) Disconnected(s inet.Network, c inet.Conn) {}

0 comments on commit e1ac52a

Please sign in to comment.