From 5c093920c2c3d5b41c9f8c460177e536c32d40ea Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 18 Aug 2022 08:03:53 +0200 Subject: [PATCH 1/2] p2p: invert relay address routing --- app/app.go | 3 +- app/app_test.go | 6 ++-- app/featureset/featureset.go | 8 ++--- app/lifecycle/order.go | 2 +- app/lifecycle/orderstart_string.go | 6 ++-- p2p/discovery.go | 14 ++++---- p2p/p2p.go | 31 +++++++++--------- p2p/relay.go | 52 ++++++++++++++++++++++++++++++ 8 files changed, 87 insertions(+), 35 deletions(-) diff --git a/app/app.go b/app/app.go index 2f831b1ce..b1559b6a8 100644 --- a/app/app.go +++ b/app/app.go @@ -276,7 +276,8 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartRelay, p2p.NewRelayReserver(tcpNode, relay)) } life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(tcpNode)) - life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PDiscAdapter, p2p.NewDiscoveryAdapter(tcpNode, udpNode, peers)) + life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewDiscoveryRouter(tcpNode, udpNode, peers)) + life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewRelayRouter(tcpNode, peers, relays)) return tcpNode, localEnode, nil } diff --git a/app/app_test.go b/app/app_test.go index 8e0905cd7..ec4a56c32 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -139,11 +139,11 @@ type pingTest struct { func pingClusterAB(t *testing.T, test pingTest) { t.Helper() t.Run("pushdisc", func(t *testing.T) { - featureset.EnableForT(t, featureset.InvertDiscv5) + featureset.EnableForT(t, featureset.InvertLibP2PRouting) pingCluster(t, test) }) t.Run("pulldisc", func(t *testing.T) { - featureset.DisableForT(t, featureset.InvertDiscv5) + featureset.DisableForT(t, featureset.InvertLibP2PRouting) pingCluster(t, test) }) } @@ -169,7 +169,7 @@ func pingCluster(t *testing.T, test pingTest) { bootnodes = append(bootnodes, bootAddr) } - const n = 3 + const n = 2 lock, p2pKeys, _ := cluster.NewForT(t, 1, n, n, 0) asserter := &pingAsserter{ asserter: asserter{ diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index 38b4ef687..45c1114b5 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -39,15 +39,15 @@ const ( // QBFTConsensus introduces qbft consensus, see https://github.com/ObolNetwork/charon/issues/445. QBFTConsensus Feature = "qbft_consensus" - // InvertDiscv5 enables the new push based discv5 integration and disables the old pull based. - InvertDiscv5 Feature = "invert_discv5" + // InvertLibP2PRouting enables the new push based libp2p routing and disables the old pull based. + InvertLibP2PRouting Feature = "invert_libp2p_routing" ) var ( // state defines the current rollout status of each feature. state = map[Feature]status{ - QBFTConsensus: statusStable, - InvertDiscv5: statusAlpha, + QBFTConsensus: statusStable, + InvertLibP2PRouting: statusAlpha, // Add all features and there status here. } diff --git a/app/lifecycle/order.go b/app/lifecycle/order.go index 83df50330..1849f3496 100644 --- a/app/lifecycle/order.go +++ b/app/lifecycle/order.go @@ -32,7 +32,7 @@ const ( StartMonitoringAPI StartValidatorAPI StartP2PPing - StartP2PDiscAdapter + StartP2PRouters StartP2PConsensus StartSimulator StartScheduler diff --git a/app/lifecycle/orderstart_string.go b/app/lifecycle/orderstart_string.go index 0a15e8ca1..86ed698f6 100644 --- a/app/lifecycle/orderstart_string.go +++ b/app/lifecycle/orderstart_string.go @@ -29,16 +29,16 @@ func _() { _ = x[StartMonitoringAPI-3] _ = x[StartValidatorAPI-4] _ = x[StartP2PPing-5] - _ = x[StartP2PDiscAdapter-6] + _ = x[StartP2PRouters-6] _ = x[StartP2PConsensus-7] _ = x[StartSimulator-8] _ = x[StartScheduler-9] _ = x[StartP2PEventCollector-10] } -const _OrderStart_name = "TrackerAggSigDBRelayMonitoringAPIValidatorAPIP2PPingP2PDiscAdapterP2PConsensusSimulatorSchedulerP2PEventCollector" +const _OrderStart_name = "TrackerAggSigDBRelayMonitoringAPIValidatorAPIP2PPingP2PRoutersP2PConsensusSimulatorSchedulerP2PEventCollector" -var _OrderStart_index = [...]uint8{0, 7, 15, 20, 33, 45, 52, 66, 78, 87, 96, 113} +var _OrderStart_index = [...]uint8{0, 7, 15, 20, 33, 45, 52, 62, 74, 83, 92, 109} func (i OrderStart) String() string { if i < 0 || i >= OrderStart(len(_OrderStart_index)-1) { diff --git a/p2p/discovery.go b/p2p/discovery.go index 29da5664a..a6e74a6a1 100644 --- a/p2p/discovery.go +++ b/p2p/discovery.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/peerstore" ma "github.com/multiformats/go-multiaddr" "github.com/obolnetwork/charon/app/errors" @@ -138,18 +137,17 @@ func NewLocalEnode(config Config, key *ecdsa.PrivateKey) (*enode.LocalNode, *eno return node, db, nil } -// NewDiscoveryAdapter returns a life cycle hook that links discv5 to libp2p by -// continuously polling discv5 for latest peer ERNs and adding then to libp2p peer store. -func NewDiscoveryAdapter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Peer) lifecycle.HookFuncCtx { +// NewDiscoveryRouter returns a life cycle hook that links discv5 to libp2p by +// continuously polling discv5 for latest peer ENRs and adding then to libp2p peer store. +func NewDiscoveryRouter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Peer) lifecycle.HookFuncCtx { return func(ctx context.Context) { - if !featureset.Enabled(featureset.InvertDiscv5) { + if !featureset.Enabled(featureset.InvertLibP2PRouting) { return } ctx = log.WithTopic(ctx, "p2p") - ttl := peerstore.TempAddrTTL baseDelay := expbackoff.WithBaseDelay(time.Millisecond * 100) // Poll quickly on startup - maxDelay := expbackoff.WithMaxDelay(ttl * 9 / 10) // Slow down to 90% of ttl + maxDelay := expbackoff.WithMaxDelay(routedAddrTTL * 9 / 10) // Slow down to 90% of ttl backoff := expbackoff.New(ctx, baseDelay, maxDelay) addrs := make(map[peer.ID]string) @@ -172,7 +170,7 @@ func NewDiscoveryAdapter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Pee addrs[p.ID] = addrStr } - tcpNode.Peerstore().AddAddr(p.ID, addr, ttl) + tcpNode.Peerstore().AddAddr(p.ID, addr, routedAddrTTL) } } diff --git a/p2p/p2p.go b/p2p/p2p.go index 9376375c7..a1a69f713 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -70,10 +70,13 @@ func NewTCPNode(cfg Config, key *ecdsa.PrivateKey, connGater ConnGater, // Define p2pcfg.AddrsFactory that does not advertise // addresses via libp2p, since we use discv5 for peer discovery. libp2p.AddrsFactory(func([]ma.Multiaddr) []ma.Multiaddr { return nil }), + } - libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { + if !featureset.Enabled(featureset.InvertLibP2PRouting) { + opt := libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { return logWrapRouting(adaptDiscRouting(udpNode, peers, relays)), nil - }), + }) + defaultOpts = append(defaultOpts, opt) } defaultOpts = append(defaultOpts, opts...) @@ -131,21 +134,19 @@ func adaptDiscRouting(udpNode *discover.UDPv5, peers, relays []Peer) peerRouting var mAddrs []ma.Multiaddr - if !featureset.Enabled(featureset.InvertDiscv5) { - resolved := udpNode.Resolve(&node) - if resolved == nil { - return peer.AddrInfo{}, errors.New("peer not resolved") - } + resolved := udpNode.Resolve(&node) + if resolved == nil { + return peer.AddrInfo{}, errors.New("peer not resolved") + } - // If sequence is 0, we haven't discovered it yet. - // If tcp port is 0, this node isn't bound to a port. - if resolved.Seq() != 0 && resolved.TCP() != 0 { - mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP()) - if err != nil { - return peer.AddrInfo{}, err - } - mAddrs = append(mAddrs, mAddr) + // If sequence is 0, we haven't discovered it yet. + // If tcp port is 0, this node isn't bound to a port. + if resolved.Seq() != 0 && resolved.TCP() != 0 { + mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP()) + if err != nil { + return peer.AddrInfo{}, err } + mAddrs = append(mAddrs, mAddr) } // Add any circuit relays diff --git a/p2p/relay.go b/p2p/relay.go index e7f662396..33469e9bc 100644 --- a/p2p/relay.go +++ b/p2p/relay.go @@ -22,15 +22,23 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" circuit "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" ma "github.com/multiformats/go-multiaddr" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/featureset" "github.com/obolnetwork/charon/app/lifecycle" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" ) +// routedAddrTTL is a peer store TTL used to notify libp2p of peer addresses. +// We use a custom TTL (different from well-known peer store TTLs) since +// this mitigates against other libp2p services (like Identify) modifying +// or removing them. +var routedAddrTTL = peerstore.TempAddrTTL + 1 + // NewRelays returns the libp2p circuit relays from bootnodes if enabled. func NewRelays(conf Config, bootnodes []*enode.Node) ([]Peer, error) { if !conf.BootnodeRelay { @@ -121,3 +129,47 @@ func NewRelayReserver(tcpNode host.Host, relay Peer) lifecycle.HookFunc { return nil } } + +// NewRelayRouter returns a life cycle hook that routes peers via relays in libp2p by +// continuously adding peer relay addresses to libp2p peer store. +func NewRelayRouter(tcpNode host.Host, peers []Peer, relays []Peer) lifecycle.HookFuncCtx { + return func(ctx context.Context) { + if !featureset.Enabled(featureset.InvertLibP2PRouting) { + return + } + if len(relays) == 0 { + return + } + + ctx = log.WithTopic(ctx, "p2p") + + for ctx.Err() == nil { + for _, p := range peers { + if p.ID == tcpNode.ID() { + // Skip self + continue + } + + for _, relay := range relays { + if relay.Enode.TCP() == 0 { + continue + } + + relayAddr, err := multiAddrViaRelay(relay, p.ID) + if err != nil { + log.Error(ctx, "Failed discovering peer address", err) + continue + } + + tcpNode.Peerstore().AddAddr(p.ID, relayAddr, routedAddrTTL) + } + } + + select { + case <-ctx.Done(): + return + case <-time.After(routedAddrTTL * 9 / 10): + } + } + } +} From ce77481a2f43bba2730476557fa6ce845c74ff8c Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 18 Aug 2022 09:32:42 +0200 Subject: [PATCH 2/2] cleanup --- app/app_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/app_test.go b/app/app_test.go index ec4a56c32..820491f55 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -169,7 +169,7 @@ func pingCluster(t *testing.T, test pingTest) { bootnodes = append(bootnodes, bootAddr) } - const n = 2 + const n = 3 lock, p2pKeys, _ := cluster.NewForT(t, 1, n, n, 0) asserter := &pingAsserter{ asserter: asserter{