Skip to content

Commit

Permalink
wip: test allin discovery
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <guilhem.fanton@gmail.com>
  • Loading branch information
gfanton committed Mar 15, 2021
1 parent 600df87 commit 144d7cd
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 62 deletions.
115 changes: 65 additions & 50 deletions go/internal/initutil/ipfs.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
p2p_routing "github.com/libp2p/go-libp2p-core/routing"
discovery "github.com/libp2p/go-libp2p-discovery"
p2p_dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -118,8 +119,9 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode,

mopts := ipfsutil.MobileOptions{
IpfsConfigPatch: m.setupIPFSConfig,
HostConfigFunc: m.setupIPFSHost,
RoutingOption: ipfsutil.CustomRoutingOption(p2p_dht.ModeClient, p2p_dht.Concurrency(2)),
// HostConfigFunc: m.setupIPFSHost,
RoutingConfigFunc: m.configIPFSRouting,
RoutingOption: ipfsutil.CustomRoutingOption(p2p_dht.ModeClient, p2p_dht.Concurrency(2)),
ExtraOpts: map[string]bool{
// @NOTE(gfanton) temporally disable ipfs *main* pubsub
"pubsub": false,
Expand Down Expand Up @@ -357,50 +359,53 @@ func (m *Manager) setupIPFSConfig(cfg *ipfs_cfg.Config) ([]libp2p.Option, error)
}
}

if m.Node.Protocol.RelayHack {
// Resolving addresses
pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, config.Config.P2P.RelayHack)
if err != nil {
return nil, errcode.ErrIPFSSetupConfig.Wrap(err)
}

lenPis := len(pis)
pickFrom := make([]peer.AddrInfo, lenPis)
for lenPis > 0 {
lenPis--
pickFrom[lenPis] = *pis[lenPis]
}

// Selecting 2 random one
rng := mrand.New(mrand.NewSource(srand.SafeFast())) //nolint:gosec
var relays []peer.AddrInfo
if len(pickFrom) <= 2 {
relays = pickFrom
} else {
for i := 2; i > 0; i-- {
lenPickFrom := len(pickFrom)
n := rng.Intn(lenPickFrom)
relays = append(relays, pickFrom[n])
if n == 0 {
pickFrom = pickFrom[1:]
continue
}
if n == lenPickFrom-1 {
pickFrom = pickFrom[:n-1]
continue
}
pickFrom = append(pickFrom[:n], pickFrom[n+1:]...)
}
}

// for _, relay := range relays {
// for _, addr := range relay.Addrs {
// cfg.Addresses.Announce = append(cfg.Addresses.Announce, addr.String()+"/p2p/"+relay.ID.String()+"/p2p-circuit")
// }
// }

p2popts = append(p2popts, libp2p.StaticRelays(relays))
}
// if m.Node.Protocol.RelayHack {
// // Resolving addresses
// pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, config.Config.P2P.RelayHack)
// if err != nil {
// return nil, errcode.ErrIPFSSetupConfig.Wrap(err)
// }

// lenPis := len(pis)
// pickFrom := make([]peer.AddrInfo, lenPis)
// for lenPis > 0 {
// lenPis--
// pickFrom[lenPis] = *pis[lenPis]
// }

// // Selecting 2 random one
// rng := mrand.New(mrand.NewSource(srand.SafeFast())) //nolint:gosec
// var relays []peer.AddrInfo
// if len(pickFrom) <= 2 {
// relays = pickFrom
// } else {
// for i := 2; i > 0; i-- {
// lenPickFrom := len(pickFrom)
// n := rng.Intn(lenPickFrom)
// relays = append(relays, pickFrom[n])
// if n == 0 {
// pickFrom = pickFrom[1:]
// continue
// }
// if n == lenPickFrom-1 {
// pickFrom = pickFrom[:n-1]
// continue
// }
// pickFrom = append(pickFrom[:n], pickFrom[n+1:]...)
// }
// }

// for _, relay := range relays {
// for _, addr := range relay.Addrs {
// cfg.Addresses.Announce = append(cfg.Addresses.Announce, addr.String()+"/p2p/"+relay.ID.String()+"/p2p-circuit")
// }
// }

// p2popts = append(p2popts, libp2p.StaticRelays(relays))
// }

// enable autorelay
p2popts = append(p2popts, libp2p.ListenAddrs(), libp2p.EnableAutoRelay(), libp2p.ForceReachabilityPrivate())

// prefill peerstore with known rdvp servers
if m.Node.Protocol.Tor.Mode != TorRequired {
Expand All @@ -412,7 +417,7 @@ func (m *Manager) setupIPFSConfig(cfg *ipfs_cfg.Config) ([]libp2p.Option, error)
return p2popts, nil
}

func (m *Manager) setupIPFSHost(h host.Host) error {
func (m *Manager) configIPFSRouting(h host.Host, r p2p_routing.Routing) error {
logger, err := m.getLogger()
if err != nil {
return errcode.ErrIPFSSetupHost.Wrap(err)
Expand All @@ -434,11 +439,13 @@ func (m *Manager) setupIPFSHost(h host.Host) error {
}
}

rng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand

// rdvp driver
var drivers []*tinder.Driver
if lenrdvpeers := len(rdvpeers); lenrdvpeers > 0 {
for _, peer := range rdvpeers {
h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
rng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand
udisc := tinder.NewRendezvousDiscovery(logger, h, peer.ID, rng)

name := fmt.Sprintf("rdvp#%.6s", peer.ID)
Expand All @@ -447,6 +454,15 @@ func (m *Manager) setupIPFSHost(h host.Host) error {
}
}

// dht driver
drivers = append(drivers,
tinder.NewDriverFromRouting("dht", r, nil))

// localdisc driver
localdisc := tinder.NewLocalDiscovery(logger, h, rng)
drivers = append(drivers,
tinder.NewDriverFromUnregisterDiscovery("localdisc", localdisc, tinder.FilterPrivateAddrs))

serverRng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand

backoffstrat := discovery.NewExponentialBackoff(
Expand All @@ -472,10 +488,9 @@ func (m *Manager) setupIPFSHost(h host.Host) error {
return errcode.ErrIPFSSetupHost.Wrap(err)
}

pubsub.DiscoveryPollInterval = m.Node.Protocol.PollInterval
// pubsub.DiscoveryPollInterval = m.Node.Protocol.PollInterval
m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.getContext(), h,
pubsub.WithMessageSigning(true),
pubsub.WithFloodPublish(true),
pubsub.WithDiscovery(m.Node.Protocol.discovery),
pubsub.WithPeerExchange(true),
pt.EventTracerOption(),
Expand Down
8 changes: 4 additions & 4 deletions go/internal/tinder/notify_network.go
Expand Up @@ -69,10 +69,6 @@ func (n *NetworkUpdate) GetLastUpdatedAddrs(ctx context.Context) (addrs []ma.Mul
return
}

func (n *NetworkUpdate) Close() error {
return n.sub.Close()
}

func (n *NetworkUpdate) subscribeToNetworkUpdate() {
for evt := range n.sub.Out() {
e := evt.(event.EvtLocalAddressesUpdated)
Expand All @@ -85,6 +81,10 @@ func (n *NetworkUpdate) subscribeToNetworkUpdate() {
}
}

func (n *NetworkUpdate) Close() error {
return n.sub.Close()
}

func diffAddrs(a, b []ma.Multiaddr) []ma.Multiaddr {
mb := make(map[string]struct{}, len(b))
for _, addr := range b {
Expand Down
15 changes: 7 additions & 8 deletions go/internal/tinder/service.go
Expand Up @@ -324,7 +324,7 @@ func (s *service) selectFindPeers(ctx context.Context, out chan<- p2p_peer.AddrI
for n > 0 {
sel, value, ok := reflect.Select(selCases)

// context has been cancel stop and close chan
// context has been cancel, stop
if sel == selDone {
s.logger.Debug("find peers done", zap.Error(ctx.Err()))
return ctx.Err()
Expand All @@ -347,15 +347,15 @@ func (s *service) selectFindPeers(ctx context.Context, out chan<- p2p_peer.AddrI
// addrs directly from host, it's easier (for now) to filter it
// here
if addrs := driver.AddrsFactory(peer.Addrs); len(addrs) > 0 {
s.logger.Debug("found a peer",
zap.String("driver", driver.Name),
zap.String("peer", peer.ID.String()))

filterpeer := p2p_peer.AddrInfo{
ID: peer.ID,
Addrs: addrs,
}

s.logger.Debug("found a peer",
zap.String("driver", driver.Name),
zap.String("peer", peer.ID.String()))

s.Emit(&EvtDriverMonitor{
EventType: TypeEventMonitorFoundPeer,
Topic: in[sel].topic,
Expand All @@ -364,7 +364,7 @@ func (s *service) selectFindPeers(ctx context.Context, out chan<- p2p_peer.AddrI
})

// forward the peer
out <- peer
out <- filterpeer
}
}

Expand Down Expand Up @@ -394,6 +394,5 @@ func (s *service) Emit(evt *EvtDriverMonitor) {
}

func (s *service) Close() error {
s.networkNotify.Close()
return nil
return s.networkNotify.Close()
}

0 comments on commit 144d7cd

Please sign in to comment.