Skip to content

Commit

Permalink
fix(network): force to bootstrap before provide and fix reco
Browse files Browse the repository at this point in the history
Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Mar 8, 2019
1 parent a70031a commit 4f57117
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 90 deletions.
60 changes: 19 additions & 41 deletions core/network/config/config.go
Expand Up @@ -9,7 +9,6 @@ import (

peer "github.com/libp2p/go-libp2p-peer"
pnet "github.com/libp2p/go-libp2p-pnet"
routing "github.com/libp2p/go-libp2p-routing"
swarm "github.com/libp2p/go-libp2p-swarm"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"

Expand All @@ -22,7 +21,6 @@ import (
circuit "github.com/libp2p/go-libp2p-circuit"
libp2p_crypto "github.com/libp2p/go-libp2p-crypto"
discovery "github.com/libp2p/go-libp2p-discovery"
libp2p_host "github.com/libp2p/go-libp2p-host"
quic "github.com/libp2p/go-libp2p-quic-transport"
libp2p_config "github.com/libp2p/go-libp2p/config"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
Expand Down Expand Up @@ -93,25 +91,18 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
}
}

libp2pOpts := []libp2p_config.Option{
libp2p.DefaultMuxers,
libp2p.DefaultPeerstore,
libp2p.NATPortMap(),
libp2p.DefaultTransports,
}
libp2pOpts := []libp2p_config.Option{}

logger().Debug(fmt.Sprintf("bootstrap: %+v", cfg.Bootstrap))
if cfg.DefaultBootstrap {
cfg.Bootstrap = append(cfg.Bootstrap, DefaultBootstrap...)
}
logger().Debug(fmt.Sprintf("bootstrap: %+v", cfg.Bootstrap))

libp2pOpts = append(libp2pOpts, libp2p.DefaultListenAddrs)
if len(cfg.Bind) > 0 {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings(cfg.Bind...))
} else {
libp2pOpts = append(libp2pOpts, libp2p.DefaultListenAddrs)
}

// add ble transport
if cfg.BLE {
libp2pOpts = append(libp2pOpts, libp2p.Transport(ble.NewTransport))
Expand All @@ -123,12 +114,12 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
}

// relay
libp2pOpts = append(libp2pOpts, libp2p.EnableAutoRelay())
if cfg.HOP {
libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(circuit.OptActive, circuit.OptHop))
} else {
libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(circuit.OptActive))
}
// libp2pOpts = append(libp2pOpts, libp2p.EnableAutoRelay())
// if cfg.HOP {
// libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(circuit.OptActive, circuit.OptHop))
// } else {
// libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(circuit.OptActive))
// }

// private network
if cfg.SwarmKey != "" {
Expand All @@ -155,27 +146,20 @@ func (cfg *Config) Apply(ctx context.Context, opts ...Option) error {
libp2pOpts = append(libp2pOpts, libp2p.RandomIdentity)
}

libp2pOpts = append(libp2pOpts,
libp2p.ConnectionManager(host.NewBertyConnMgr(ctx, 10, 20, time.Duration(60*time.Minute))))

libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
// override libp2p configuration
err := cfg.Config.Apply(append(libp2pOpts, libp2p.FallbackDefaults)...)
if err != nil {
return err
}

// override conn manager
cfg.Config.ConnManager = host.NewBertyConnMgr(ctx, 10, 20, time.Duration(60*time.Minute))

// override ping service
cfg.Config.DisablePing = true

// setup dht for libp2p routing host

cfg.Config.Routing = func(h libp2p_host.Host) (routing.PeerRouting, error) {
var err error
if cfg.routing, err = host.NewBertyRouting(ctx, h, cfg.DHT); err != nil {
return nil, err
}
return cfg.routing, nil
}
return nil
}

Expand Down Expand Up @@ -277,21 +261,15 @@ func (cfg *Config) NewNode(ctx context.Context) (*host.BertyHost, error) {
}
h.Network().Notify(h.Routing.(*host.BertyRouting))

// Configure relay
if !cfg.Config.Relay {
h.Close()
return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled")
}

crouter, ok := h.Routing.(routing.ContentRouting)
if !ok {
h.Close()
return nil, fmt.Errorf("cannot enable autorelay; no suitable routing for discovery")
}
// crouter, ok := h.Routing.(routing.ContentRouting)
// if !ok {
// h.Close()
// return nil, fmt.Errorf("cannot enable autorelay; no suitable routing for discovery")
// }

routerDiscovery := discovery.NewRoutingDiscovery(crouter)
// routerDiscovery := discovery.NewRoutingDiscovery(h.Routing)

discoveries = append(discoveries, routerDiscovery)
// discoveries = append(discoveries, routerDiscovery)

// configure mdns service
if cfg.MDNS {
Expand Down
22 changes: 16 additions & 6 deletions core/network/driver.go
Expand Up @@ -108,10 +108,17 @@ func (net *Network) Bootstrap(ctx context.Context, sync bool, addrs ...string) e
bf = net.BootstrapPeer
}

var err error
for _, addr := range addrs {
if err := bf(ctx, addr); err != nil {
return err
err = nil
if err = bf(ctx, addr); err != nil {
logger().Error(err.Error())
continue
}
break
}
if err != nil {
return err
}

return nil
Expand Down Expand Up @@ -147,12 +154,16 @@ func (net *Network) BootstrapPeer(ctx context.Context, bootstrapAddr string) err
return err
}

logger().Debug("Bootrstraping peer", zap.String("peer info", fmt.Sprintf("%+v", pinfo)))
logger().Debug("Bootstraping peer", zap.String("peer info", fmt.Sprintf("%+v", pinfo)))
// Even if we can't connect, bootstrap peers are trusted peers, add it to
// the peerstore so we can connect later in case of failure
net.host.Peerstore().AddAddrs(pinfo.ID, pinfo.Addrs, pstore.PermanentAddrTTL)
if err := net.host.Connect(ctx, *pinfo); err != nil {
return err
}

net.host.ConnManager().TagPeer(pinfo.ID, "bootstrap", 2)
return net.host.Connect(ctx, *pinfo)
return nil
}

func (net *Network) Discover(ctx context.Context) {
Expand Down Expand Up @@ -370,8 +381,7 @@ func (net *Network) Join(ctx context.Context, id string) error {
return err
}

go net.host.Routing.Provide(ctx, c, true)
return nil
return net.host.Routing.Provide(ctx, c, true)
}

func (net *Network) OnEnvelopeHandler(f func(context.Context, *entity.Envelope) (*entity.Void, error)) {
Expand Down
124 changes: 99 additions & 25 deletions core/network/host/connmanager.go
Expand Up @@ -2,16 +2,19 @@ package host

import (
"context"
"fmt"
"time"

connmgr "github.com/libp2p/go-libp2p-connmgr"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
)

var _ ifconnmgr.ConnManager = (*BertyConnMgr)(nil)
var _ inet.Notifiee = (*BertyConnMgr)(nil)

type BertyConnMgr struct {
*connmgr.BasicConnMgr
Expand Down Expand Up @@ -47,7 +50,7 @@ func (cm *BertyConnMgr) GetInfo() connmgr.CMInfo {
}

func (cm *BertyConnMgr) Notifee() inet.Notifiee {
return cm.BasicConnMgr.Notifee()
return cm
}

func (cm *BertyConnMgr) Connected(net inet.Network, c inet.Conn) {
Expand All @@ -58,35 +61,106 @@ func (cm *BertyConnMgr) Connected(net inet.Network, c inet.Conn) {
func (cm *BertyConnMgr) Disconnected(net inet.Network, c inet.Conn) {
// check if it a relay conn and try to reconnect
tagInfo := cm.GetTagInfo(c.RemotePeer())
peerID := c.RemotePeer()

v, ok := tagInfo.Tags["relay-hop"]
if !ok || v != 2 {
cm.BasicConnMgr.Notifee().Disconnected(net, c)
// TODO: reconnect to kbucket && bootstrap && relay if list of each are < 1
logger().Debug("Disconnected", zap.String("tagInfo", fmt.Sprintf("%+v", tagInfo.Tags)))
if v, ok := tagInfo.Tags["kbucket"]; ok && v == 5 {
go func() {
for {
logger().Debug(
"connmanager: try to reconnect to kbucket",
zap.String("id", peerID.Pretty()),
)
if _, err := net.DialPeer(cm.ctx, peerID); err != nil {
logger().Debug(
"connmanager: cannot reconnect to kbucket",
zap.String("id", peerID.Pretty()),
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 10):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
break
}
}
break
}
}()
return
}

peerID := c.RemotePeer()
go func() {
for {
logger().Debug(
"connmanager: try to reconnect to relay",
zap.String("id", peerID.Pretty()),
)
if _, err := net.DialPeer(cm.ctx, peerID); err != nil {
} else if v, ok := tagInfo.Tags["bootstrap"]; ok && v == 2 {
go func() {
for {
logger().Debug(
"connmanager: try to reconnect to bootstrap",
zap.String("id", peerID.Pretty()),
)
if _, err := net.DialPeer(cm.ctx, peerID); err != nil {
logger().Debug(
"connmanager: cannot reconnect to bootstrap",
zap.String("id", peerID.Pretty()),
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 10):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
break
}
}
break
}
}()
return
} else if v, ok := tagInfo.Tags["relay-hop"]; ok && v == 2 {
go func() {
for {
logger().Debug(
"connmanager: cannot reconnect to relay",
"connmanager: try to reconnect to relay",
zap.String("id", peerID.Pretty()),
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 10):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
break
if _, err := net.DialPeer(cm.ctx, peerID); err != nil {
logger().Debug(
"connmanager: cannot reconnect to relay",
zap.String("id", peerID.Pretty()),
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 10):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
break
}
}
break
}
break
}
}()
}()
return
} else {
cm.BasicConnMgr.Notifee().Disconnected(net, c)
}
}

// Listen is no-op in this implementation.
func (cm *BertyConnMgr) Listen(n inet.Network, addr ma.Multiaddr) {
cm.BasicConnMgr.Notifee().Listen(n, addr)
}

// ListenClose is no-op in this implementation.
func (cm *BertyConnMgr) ListenClose(n inet.Network, addr ma.Multiaddr) {
cm.BasicConnMgr.Notifee().Listen(n, addr)
}

// OpenedStream is no-op in this implementation.
func (cm *BertyConnMgr) OpenedStream(n inet.Network, s inet.Stream) {
cm.BasicConnMgr.Notifee().OpenedStream(n, s)
}

// ClosedStream is no-op in this implementation.
func (cm *BertyConnMgr) ClosedStream(n inet.Network, s inet.Stream) {
cm.BasicConnMgr.Notifee().ClosedStream(n, s)
}
22 changes: 13 additions & 9 deletions core/network/host/routing.go
Expand Up @@ -119,20 +119,13 @@ func (br *BertyRouting) FindPeer(ctx context.Context, pid peer.ID) (pstore.PeerI
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
func (br *BertyRouting) Provide(ctx context.Context, id cid.Cid, brd bool) error {
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
return nil
}

br.waitIsReady(ctx)
return br.dht.Provide(ctx, id, brd)
}

// Search for peers who are able to provide a given key
func (br *BertyRouting) FindProvidersAsync(ctx context.Context, id cid.Cid, n int) <-chan pstore.PeerInfo {
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
}

br.waitIsReady(ctx)
return br.dht.FindProvidersAsync(ctx, id, n)
}

Expand All @@ -145,6 +138,17 @@ func (br *BertyRouting) isReady(ctx context.Context) error {
}
}

func (br *BertyRouting) waitIsReady(ctx context.Context) {
for {
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
time.Sleep(time.Second)
continue
}
break
}
}

func (br *BertyRouting) Listen(net inet.Network, a ma.Multiaddr) {}
func (br *BertyRouting) ListenClose(net inet.Network, a ma.Multiaddr) {}
func (br *BertyRouting) OpenedStream(net inet.Network, s inet.Stream) {}
Expand Down

0 comments on commit 4f57117

Please sign in to comment.