Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: invert relay address routing #991

Merged
merged 2 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartRelay, p2p.NewRelayReserver(tcpNode, relay))
}
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(tcpNode))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PDiscAdapter, p2p.NewDiscoveryAdapter(tcpNode, udpNode, peers))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewDiscoveryRouter(tcpNode, udpNode, peers))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewRelayRouter(tcpNode, peers, relays))

return tcpNode, localEnode, nil
}
Expand Down
4 changes: 2 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ type pingTest struct {
func pingClusterAB(t *testing.T, test pingTest) {
t.Helper()
t.Run("pushdisc", func(t *testing.T) {
featureset.EnableForT(t, featureset.InvertDiscv5)
featureset.EnableForT(t, featureset.InvertLibP2PRouting)
pingCluster(t, test)
})
t.Run("pulldisc", func(t *testing.T) {
featureset.DisableForT(t, featureset.InvertDiscv5)
featureset.DisableForT(t, featureset.InvertLibP2PRouting)
pingCluster(t, test)
})
}
Expand Down
8 changes: 4 additions & 4 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ const (
// QBFTConsensus introduces qbft consensus, see https://github.com/ObolNetwork/charon/issues/445.
QBFTConsensus Feature = "qbft_consensus"

// InvertDiscv5 enables the new push based discv5 integration and disables the old pull based.
InvertDiscv5 Feature = "invert_discv5"
// InvertLibP2PRouting enables the new push based libp2p routing and disables the old pull based.
InvertLibP2PRouting Feature = "invert_libp2p_routing"
)

var (
// state defines the current rollout status of each feature.
state = map[Feature]status{
QBFTConsensus: statusStable,
InvertDiscv5: statusAlpha,
QBFTConsensus: statusStable,
InvertLibP2PRouting: statusAlpha,
// Add all features and there status here.
}

Expand Down
2 changes: 1 addition & 1 deletion app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
StartMonitoringAPI
StartValidatorAPI
StartP2PPing
StartP2PDiscAdapter
StartP2PRouters
StartP2PConsensus
StartSimulator
StartScheduler
Expand Down
6 changes: 3 additions & 3 deletions app/lifecycle/orderstart_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"

"github.com/obolnetwork/charon/app/errors"
Expand Down Expand Up @@ -138,18 +137,17 @@ func NewLocalEnode(config Config, key *ecdsa.PrivateKey) (*enode.LocalNode, *eno
return node, db, nil
}

// NewDiscoveryAdapter returns a life cycle hook that links discv5 to libp2p by
// continuously polling discv5 for latest peer ERNs and adding then to libp2p peer store.
func NewDiscoveryAdapter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Peer) lifecycle.HookFuncCtx {
// NewDiscoveryRouter returns a life cycle hook that links discv5 to libp2p by
// continuously polling discv5 for latest peer ENRs and adding then to libp2p peer store.
func NewDiscoveryRouter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Peer) lifecycle.HookFuncCtx {
return func(ctx context.Context) {
if !featureset.Enabled(featureset.InvertDiscv5) {
if !featureset.Enabled(featureset.InvertLibP2PRouting) {
return
}

ctx = log.WithTopic(ctx, "p2p")
ttl := peerstore.TempAddrTTL
baseDelay := expbackoff.WithBaseDelay(time.Millisecond * 100) // Poll quickly on startup
maxDelay := expbackoff.WithMaxDelay(ttl * 9 / 10) // Slow down to 90% of ttl
maxDelay := expbackoff.WithMaxDelay(routedAddrTTL * 9 / 10) // Slow down to 90% of ttl
backoff := expbackoff.New(ctx, baseDelay, maxDelay)
addrs := make(map[peer.ID]string)

Expand All @@ -172,7 +170,7 @@ func NewDiscoveryAdapter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Pee
addrs[p.ID] = addrStr
}

tcpNode.Peerstore().AddAddr(p.ID, addr, ttl)
tcpNode.Peerstore().AddAddr(p.ID, addr, routedAddrTTL)
}
}

Expand Down
31 changes: 16 additions & 15 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ func NewTCPNode(cfg Config, key *ecdsa.PrivateKey, connGater ConnGater,
// Define p2pcfg.AddrsFactory that does not advertise
// addresses via libp2p, since we use discv5 for peer discovery.
libp2p.AddrsFactory(func([]ma.Multiaddr) []ma.Multiaddr { return nil }),
}

libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
if !featureset.Enabled(featureset.InvertLibP2PRouting) {
opt := libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
return logWrapRouting(adaptDiscRouting(udpNode, peers, relays)), nil
}),
})
defaultOpts = append(defaultOpts, opt)
}

defaultOpts = append(defaultOpts, opts...)
Expand Down Expand Up @@ -131,21 +134,19 @@ func adaptDiscRouting(udpNode *discover.UDPv5, peers, relays []Peer) peerRouting

var mAddrs []ma.Multiaddr

if !featureset.Enabled(featureset.InvertDiscv5) {
resolved := udpNode.Resolve(&node)
if resolved == nil {
return peer.AddrInfo{}, errors.New("peer not resolved")
}
resolved := udpNode.Resolve(&node)
if resolved == nil {
return peer.AddrInfo{}, errors.New("peer not resolved")
}

// If sequence is 0, we haven't discovered it yet.
// If tcp port is 0, this node isn't bound to a port.
if resolved.Seq() != 0 && resolved.TCP() != 0 {
mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return peer.AddrInfo{}, err
}
mAddrs = append(mAddrs, mAddr)
// If sequence is 0, we haven't discovered it yet.
// If tcp port is 0, this node isn't bound to a port.
if resolved.Seq() != 0 && resolved.TCP() != 0 {
mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return peer.AddrInfo{}, err
}
mAddrs = append(mAddrs, mAddr)
}

// Add any circuit relays
Expand Down
52 changes: 52 additions & 0 deletions p2p/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
circuit "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
ma "github.com/multiformats/go-multiaddr"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)

// routedAddrTTL is a peer store TTL used to notify libp2p of peer addresses.
// We use a custom TTL (different from well-known peer store TTLs) since
// this mitigates against other libp2p services (like Identify) modifying
// or removing them.
var routedAddrTTL = peerstore.TempAddrTTL + 1

// NewRelays returns the libp2p circuit relays from bootnodes if enabled.
func NewRelays(conf Config, bootnodes []*enode.Node) ([]Peer, error) {
if !conf.BootnodeRelay {
Expand Down Expand Up @@ -121,3 +129,47 @@ func NewRelayReserver(tcpNode host.Host, relay Peer) lifecycle.HookFunc {
return nil
}
}

// NewRelayRouter returns a life cycle hook that routes peers via relays in libp2p by
// continuously adding peer relay addresses to libp2p peer store.
func NewRelayRouter(tcpNode host.Host, peers []Peer, relays []Peer) lifecycle.HookFuncCtx {
return func(ctx context.Context) {
if !featureset.Enabled(featureset.InvertLibP2PRouting) {
return
}
if len(relays) == 0 {
return
}

ctx = log.WithTopic(ctx, "p2p")

for ctx.Err() == nil {
for _, p := range peers {
if p.ID == tcpNode.ID() {
// Skip self
continue
}

for _, relay := range relays {
if relay.Enode.TCP() == 0 {
continue
}

relayAddr, err := multiAddrViaRelay(relay, p.ID)
if err != nil {
log.Error(ctx, "Failed discovering peer address", err)
continue
}

tcpNode.Peerstore().AddAddr(p.ID, relayAddr, routedAddrTTL)
}
}

select {
case <-ctx.Done():
return
case <-time.After(routedAddrTTL * 9 / 10):
}
}
}
}