Skip to content

Commit

Permalink
p2p: invert peer discovery integration (#989)
Browse files Browse the repository at this point in the history
Inverts the integration between  discv5 and libp2p from pull to push.

category: refactor
ticket: #985 
feature_flag: invert_discv5
  • Loading branch information
corverroos committed Aug 18, 2022
1 parent 09bdf33 commit a68a2c6
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 31 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartRelay, p2p.NewRelayReserver(tcpNode, relay))
}
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(tcpNode))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PDiscAdapter, p2p.NewDiscoveryAdapter(tcpNode, udpNode, peers))

return tcpNode, localEnode, nil
}
Expand Down
30 changes: 24 additions & 6 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/obolnetwork/charon/cmd"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/testutil"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

//go:generate go test . -v -run=TestPingCluster -slow
Expand All @@ -51,7 +52,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to lock ENR addresses.
// Discv5 can just use those as bootnodes.
t.Run("bind_enrs", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
Slow: false,
BootLock: true,
BindENRAddrs: true,
Expand All @@ -62,7 +63,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to random localhost ports (not the lock ENRs), with only single bootnode.
// Discv5 will resolve peers via bootnode.
t.Run("bootnode_only", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
BindLocalhost: true,
BootLock: false,
Bootnode: true,
Expand All @@ -72,7 +73,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to random 0.0.0.0 ports (but use 127.0.0.1 as external IP), with only single bootnode.
// Discv5 will resolve peers via bootnode and external IP.
t.Run("external_ip", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
ExternalIP: "127.0.0.1",
BindZeroIP: true,
BootLock: false,
Expand All @@ -83,7 +84,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to 0.0.0.0 (but use localhost as external host), with only single bootnode.
// Discv5 will resolve peers via bootnode and external host.
t.Run("external_host", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
ExternalHost: "localhost",
BindZeroIP: true,
BootLock: false,
Expand All @@ -96,7 +97,7 @@ func TestPingCluster(t *testing.T) {
// Node discv5 will not resolve direct address, nodes will connect to bootnode,
// and libp2p will relay via bootnode.
t.Run("bootnode_relay", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
BootnodeRelay: true,
BindZeroPort: true,
Bootnode: true,
Expand All @@ -108,7 +109,7 @@ func TestPingCluster(t *testing.T) {
// Discv5 times out resolving stale ENRs, then resolves peers via external node.
// This is slow due to discv5 internal timeouts, run with -slow.
t.Run("bootnode_and_stale_enrs", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
Slow: true,
BindLocalhost: true,
BootLock: true,
Expand All @@ -134,6 +135,19 @@ type pingTest struct {
ExternalHost string
}

// TODO(corver): Remove once featureset.InvertDiscv5 launched.
func pingClusterAB(t *testing.T, test pingTest) {
t.Helper()
t.Run("pushdisc", func(t *testing.T) {
featureset.EnableForT(t, featureset.InvertDiscv5)
pingCluster(t, test)
})
t.Run("pulldisc", func(t *testing.T) {
featureset.DisableForT(t, featureset.InvertDiscv5)
pingCluster(t, test)
})
}

func pingCluster(t *testing.T, test pingTest) {
t.Helper()

Expand Down Expand Up @@ -178,6 +192,10 @@ func pingCluster(t *testing.T, test pingTest) {
P2PKey: p2pKeys[i],
PingCallback: asserter.Callback(t, i),
DisablePromWrap: true,
SimnetBMockOpts: []beaconmock.Option{
beaconmock.WithNoAttesterDuties(),
beaconmock.WithNoProposerDuties(),
},
},
P2P: p2p.Config{
UDPBootnodes: bootnodes,
Expand Down
4 changes: 4 additions & 0 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ type Feature string
const (
// QBFTConsensus introduces qbft consensus, see https://github.com/ObolNetwork/charon/issues/445.
QBFTConsensus Feature = "qbft_consensus"

// InvertDiscv5 enables the new push based discv5 integration and disables the old pull based.
InvertDiscv5 Feature = "invert_discv5"
)

var (
// state defines the current rollout status of each feature.
state = map[Feature]status{
QBFTConsensus: statusStable,
InvertDiscv5: statusAlpha,
// Add all features and there status here.
}

Expand Down
1 change: 1 addition & 0 deletions app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
StartMonitoringAPI
StartValidatorAPI
StartP2PPing
StartP2PDiscAdapter
StartP2PConsensus
StartSimulator
StartScheduler
Expand Down
13 changes: 7 additions & 6 deletions app/lifecycle/orderstart_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 69 additions & 6 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@
package p2p

import (
"context"
"crypto/ecdsa"
"net"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/expbackoff"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)

// UDPNode wraps a discv5 udp node and adds the bootnodes relays.
type UDPNode struct {
*discover.UDPv5
Relays []Peer
}

// NewUDPNode starts and returns a discv5 UDP implementation.
func NewUDPNode(config Config, ln *enode.LocalNode,
key *ecdsa.PrivateKey, bootnodes []*enode.Node,
Expand Down Expand Up @@ -133,3 +137,62 @@ func NewLocalEnode(config Config, key *ecdsa.PrivateKey) (*enode.LocalNode, *eno

return node, db, nil
}

// NewDiscoveryAdapter returns a life cycle hook that links discv5 to libp2p by
// continuously polling discv5 for latest peer ERNs and adding then to libp2p peer store.
func NewDiscoveryAdapter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Peer) lifecycle.HookFuncCtx {
return func(ctx context.Context) {
if !featureset.Enabled(featureset.InvertDiscv5) {
return
}

ctx = log.WithTopic(ctx, "p2p")
ttl := peerstore.TempAddrTTL
baseDelay := expbackoff.WithBaseDelay(time.Millisecond * 100) // Poll quickly on startup
maxDelay := expbackoff.WithMaxDelay(ttl * 9 / 10) // Slow down to 90% of ttl
backoff := expbackoff.New(ctx, baseDelay, maxDelay)
addrs := make(map[peer.ID]string)

for ctx.Err() == nil {
for _, p := range peers {
if p.ID == tcpNode.ID() {
// Skip self
continue
}

addr, ok, err := getDiscoveredAddress(udpNode, p)
if err != nil {
log.Error(ctx, "Failed discovering peer address", err)
} else if ok {
addrStr := addr.String()
if addrs[p.ID] != addrStr {
log.Info(ctx, "Discovered new peer address",
z.Str("peer", PeerName(p.ID)),
z.Str("address", addrStr))
addrs[p.ID] = addrStr
}

tcpNode.Peerstore().AddAddr(p.ID, addr, ttl)
}
}

backoff()
}
}
}

// getDiscoveredAddress returns the peer's address as discovered by discv5,
// it returns false if the peer isn't discovered.
func getDiscoveredAddress(udpNode *discover.UDPv5, p Peer) (ma.Multiaddr, bool, error) {
resolved := udpNode.Resolve(&p.Enode)
if resolved.Seq() == 0 || resolved.TCP() == 0 {
return nil, false, nil // Not discovered
}

addr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return nil, false, err
}

return addr, true, nil
}
2 changes: 1 addition & 1 deletion p2p/gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestP2PConnGating(t *testing.T) {

err = nodeA.Connect(context.Background(), addr)
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("gater rejected connection with peer %s and addr %s", addr.ID, addr.Addrs[0]))
require.Contains(t, err.Error(), fmt.Sprintf("gater rejected connection with peer %s", addr.ID))
}

func TestOpenGater(t *testing.T) {
Expand Down
27 changes: 15 additions & 12 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
ma "github.com/multiformats/go-multiaddr"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/version"
Expand Down Expand Up @@ -128,21 +129,23 @@ func adaptDiscRouting(udpNode *discover.UDPv5, peers, relays []Peer) peerRouting
return peer.AddrInfo{}, errors.New("unknown peer")
}

resolved := udpNode.Resolve(&node)
if resolved == nil {
return peer.AddrInfo{}, errors.New("peer not resolved")
}

var mAddrs []ma.Multiaddr

// If sequence is 0, we haven't discovered it yet.
// If tcp port is 0, this node isn't bound to a port.
if resolved.Seq() != 0 && resolved.TCP() != 0 {
mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return peer.AddrInfo{}, err
if !featureset.Enabled(featureset.InvertDiscv5) {
resolved := udpNode.Resolve(&node)
if resolved == nil {
return peer.AddrInfo{}, errors.New("peer not resolved")
}

// If sequence is 0, we haven't discovered it yet.
// If tcp port is 0, this node isn't bound to a port.
if resolved.Seq() != 0 && resolved.TCP() != 0 {
mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return peer.AddrInfo{}, err
}
mAddrs = append(mAddrs, mAddr)
}
mAddrs = append(mAddrs, mAddr)
}

// Add any circuit relays
Expand Down

0 comments on commit a68a2c6

Please sign in to comment.