Skip to content

Commit

Permalink
wip: test allin discovery
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <guilhem.fanton@gmail.com>
  • Loading branch information
gfanton committed Mar 26, 2021
1 parent abb2909 commit c406d6f
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 124 deletions.
124 changes: 73 additions & 51 deletions go/internal/initutil/ipfs.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
p2p_routing "github.com/libp2p/go-libp2p-core/routing"
discovery "github.com/libp2p/go-libp2p-discovery"
p2p_dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -122,8 +123,9 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode,

mopts := ipfsutil.MobileOptions{
IpfsConfigPatch: m.setupIPFSConfig,
HostConfigFunc: m.setupIPFSHost,
RoutingOption: ipfsutil.CustomRoutingOption(p2p_dht.ModeClient, p2p_dht.Concurrency(2)),
// HostConfigFunc: m.setupIPFSHost,
RoutingConfigFunc: m.configIPFSRouting,
RoutingOption: ipfsutil.CustomRoutingOption(p2p_dht.ModeClient, p2p_dht.Concurrency(2)),
ExtraOpts: map[string]bool{
// @NOTE(gfanton) temporally disable ipfs *main* pubsub
"pubsub": false,
Expand Down Expand Up @@ -367,50 +369,53 @@ func (m *Manager) setupIPFSConfig(cfg *ipfs_cfg.Config) ([]libp2p.Option, error)
}
}

if m.Node.Protocol.RelayHack {
// Resolving addresses
pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, config.Config.P2P.RelayHack)
if err != nil {
return nil, errcode.ErrIPFSSetupConfig.Wrap(err)
}

lenPis := len(pis)
pickFrom := make([]peer.AddrInfo, lenPis)
for lenPis > 0 {
lenPis--
pickFrom[lenPis] = *pis[lenPis]
}

// Selecting 2 random one
rng := mrand.New(mrand.NewSource(srand.SafeFast())) //nolint:gosec
var relays []peer.AddrInfo
if len(pickFrom) <= 2 {
relays = pickFrom
} else {
for i := 2; i > 0; i-- {
lenPickFrom := len(pickFrom)
n := rng.Intn(lenPickFrom)
relays = append(relays, pickFrom[n])
if n == 0 {
pickFrom = pickFrom[1:]
continue
}
if n == lenPickFrom-1 {
pickFrom = pickFrom[:n-1]
continue
}
pickFrom = append(pickFrom[:n], pickFrom[n+1:]...)
}
}

// for _, relay := range relays {
// for _, addr := range relay.Addrs {
// cfg.Addresses.Announce = append(cfg.Addresses.Announce, addr.String()+"/p2p/"+relay.ID.String()+"/p2p-circuit")
// }
// }

p2popts = append(p2popts, libp2p.StaticRelays(relays))
}
// if m.Node.Protocol.RelayHack {
// // Resolving addresses
// pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, config.Config.P2P.RelayHack)
// if err != nil {
// return nil, errcode.ErrIPFSSetupConfig.Wrap(err)
// }

// lenPis := len(pis)
// pickFrom := make([]peer.AddrInfo, lenPis)
// for lenPis > 0 {
// lenPis--
// pickFrom[lenPis] = *pis[lenPis]
// }

// // Selecting 2 random one
// rng := mrand.New(mrand.NewSource(srand.SafeFast())) //nolint:gosec
// var relays []peer.AddrInfo
// if len(pickFrom) <= 2 {
// relays = pickFrom
// } else {
// for i := 2; i > 0; i-- {
// lenPickFrom := len(pickFrom)
// n := rng.Intn(lenPickFrom)
// relays = append(relays, pickFrom[n])
// if n == 0 {
// pickFrom = pickFrom[1:]
// continue
// }
// if n == lenPickFrom-1 {
// pickFrom = pickFrom[:n-1]
// continue
// }
// pickFrom = append(pickFrom[:n], pickFrom[n+1:]...)
// }
// }

// for _, relay := range relays {
// for _, addr := range relay.Addrs {
// cfg.Addresses.Announce = append(cfg.Addresses.Announce, addr.String()+"/p2p/"+relay.ID.String()+"/p2p-circuit")
// }
// }

// p2popts = append(p2popts, libp2p.StaticRelays(relays))
// }

// enable autorelay
p2popts = append(p2popts, libp2p.ListenAddrs(), libp2p.EnableAutoRelay(), libp2p.ForceReachabilityPrivate())

// prefill peerstore with known rdvp servers
if m.Node.Protocol.Tor.Mode != TorRequired {
Expand All @@ -422,7 +427,7 @@ func (m *Manager) setupIPFSConfig(cfg *ipfs_cfg.Config) ([]libp2p.Option, error)
return p2popts, nil
}

func (m *Manager) setupIPFSHost(h host.Host) error {
func (m *Manager) configIPFSRouting(h host.Host, r p2p_routing.Routing) error {
logger, err := m.getLogger()
if err != nil {
return errcode.ErrIPFSSetupHost.Wrap(err)
Expand All @@ -444,11 +449,13 @@ func (m *Manager) setupIPFSHost(h host.Host) error {
}
}

rng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand

// rdvp driver
var drivers []*tinder.Driver
if lenrdvpeers := len(rdvpeers); lenrdvpeers > 0 {
for _, peer := range rdvpeers {
h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
rng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand
udisc := tinder.NewRendezvousDiscovery(logger, h, peer.ID, rng)

name := fmt.Sprintf("rdvp#%.6s", peer.ID)
Expand All @@ -457,6 +464,15 @@ func (m *Manager) setupIPFSHost(h host.Host) error {
}
}

// dht driver
drivers = append(drivers,
tinder.NewDriverFromRouting("dht", r, nil))

// localdisc driver
localdisc := tinder.NewLocalDiscovery(logger, h, rng)
drivers = append(drivers,
tinder.NewDriverFromUnregisterDiscovery("localdisc", localdisc, tinder.FilterPrivateAddrs))

serverRng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand

backoffstrat := discovery.NewExponentialBackoff(
Expand All @@ -482,11 +498,17 @@ func (m *Manager) setupIPFSHost(h host.Host) error {
return errcode.ErrIPFSSetupHost.Wrap(err)
}

pubsub.DiscoveryPollInterval = m.Node.Protocol.PollInterval
cacheSize := 100
dialTimeout := time.Second * 20
backoffconnector := func(host host.Host) (*discovery.BackoffConnector, error) {
return discovery.NewBackoffConnector(host, cacheSize, dialTimeout, backoffstrat)
}

// pubsub.DiscoveryPollInterval = m.Node.Protocol.PollInterval
m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.getContext(), h,
pubsub.WithMessageSigning(true),
pubsub.WithFloodPublish(true),
pubsub.WithDiscovery(m.Node.Protocol.discovery),
pubsub.WithDiscovery(m.Node.Protocol.discovery,
pubsub.WithDiscoverConnector(backoffconnector)),
pubsub.WithPeerExchange(true),
pt.EventTracerOption(),
)
Expand Down
5 changes: 5 additions & 0 deletions go/internal/tinder/driver.go
Expand Up @@ -26,6 +26,11 @@ type UnregisterDiscovery interface {
Unregisterer
}

type DriverDiscovery struct {
p2p_discovery.Discoverer
p2p_discovery.Advertiser
}

type Driver struct {
Name string
AddrsFactory bhost.AddrsFactory
Expand Down
8 changes: 4 additions & 4 deletions go/internal/tinder/notify_network.go
Expand Up @@ -69,10 +69,6 @@ func (n *NetworkUpdate) GetLastUpdatedAddrs(ctx context.Context) (addrs []ma.Mul
return
}

func (n *NetworkUpdate) Close() error {
return n.sub.Close()
}

func (n *NetworkUpdate) subscribeToNetworkUpdate() {
for evt := range n.sub.Out() {
e := evt.(event.EvtLocalAddressesUpdated)
Expand All @@ -85,6 +81,10 @@ func (n *NetworkUpdate) subscribeToNetworkUpdate() {
}
}

func (n *NetworkUpdate) Close() error {
return n.sub.Close()
}

func diffAddrs(a, b []ma.Multiaddr) []ma.Multiaddr {
mb := make(map[string]struct{}, len(b))
for _, addr := range b {
Expand Down

0 comments on commit c406d6f

Please sign in to comment.