Skip to content

Commit

Permalink
p2p: continuously resolve bootnode ENRs (#968)
Browse files Browse the repository at this point in the history
Introduces the `p2p.MutablePeer` that can change over time, integrate that with all the logic using relays/bootnodes. Continuously resolve HTTP bootnode ENRs, and update `MutablePeer`.

category: feature
ticket: #952
  • Loading branch information
corverroos committed Aug 18, 2022
1 parent 82bfd72 commit 10c10fc
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 242 deletions.
9 changes: 3 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,22 +242,19 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
return nil, nil, err
}

udpNode, err := p2p.NewUDPNode(conf.P2P, localEnode, p2pKey, bootnodes)
udpNode, err := p2p.NewUDPNode(ctx, conf.P2P, localEnode, p2pKey, bootnodes)
if err != nil {
return nil, nil, err
}

relays, err := p2p.NewRelays(conf.P2P, bootnodes)
if err != nil {
return nil, nil, err
}
relays := p2p.NewRelays(conf.P2P, bootnodes)

connGater, err := p2p.NewConnGater(peerIDs, relays)
if err != nil {
return nil, nil, err
}

tcpNode, err := p2p.NewTCPNode(conf.P2P, p2pKey, connGater, udpNode, peers, relays)
tcpNode, err := p2p.NewTCPNode(conf.P2P, p2pKey, connGater)
if err != nil {
return nil, nil, err
}
Expand Down
25 changes: 6 additions & 19 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to lock ENR addresses.
// Discv5 can just use those as bootnodes.
t.Run("bind_enrs", func(t *testing.T) {
pingClusterAB(t, pingTest{
pingCluster(t, pingTest{
Slow: false,
BootLock: true,
BindENRAddrs: true,
Expand All @@ -63,7 +63,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to random localhost ports (not the lock ENRs), with only single bootnode.
// Discv5 will resolve peers via bootnode.
t.Run("bootnode_only", func(t *testing.T) {
pingClusterAB(t, pingTest{
pingCluster(t, pingTest{
BindLocalhost: true,
BootLock: false,
Bootnode: true,
Expand All @@ -73,7 +73,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to random 0.0.0.0 ports (but use 127.0.0.1 as external IP), with only single bootnode.
// Discv5 will resolve peers via bootnode and external IP.
t.Run("external_ip", func(t *testing.T) {
pingClusterAB(t, pingTest{
pingCluster(t, pingTest{
ExternalIP: "127.0.0.1",
BindZeroIP: true,
BootLock: false,
Expand All @@ -84,7 +84,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to 0.0.0.0 (but use localhost as external host), with only single bootnode.
// Discv5 will resolve peers via bootnode and external host.
t.Run("external_host", func(t *testing.T) {
pingClusterAB(t, pingTest{
pingCluster(t, pingTest{
ExternalHost: "localhost",
BindZeroIP: true,
BootLock: false,
Expand All @@ -97,7 +97,7 @@ func TestPingCluster(t *testing.T) {
// Node discv5 will not resolve direct address, nodes will connect to bootnode,
// and libp2p will relay via bootnode.
t.Run("bootnode_relay", func(t *testing.T) {
pingClusterAB(t, pingTest{
pingCluster(t, pingTest{
BootnodeRelay: true,
BindZeroPort: true,
Bootnode: true,
Expand All @@ -109,7 +109,7 @@ func TestPingCluster(t *testing.T) {
// Discv5 times out resolving stale ENRs, then resolves peers via external node.
// This is slow due to discv5 internal timeouts, run with -slow.
t.Run("bootnode_and_stale_enrs", func(t *testing.T) {
pingClusterAB(t, pingTest{
pingCluster(t, pingTest{
Slow: true,
BindLocalhost: true,
BootLock: true,
Expand All @@ -135,19 +135,6 @@ type pingTest struct {
ExternalHost string
}

// TODO(corver): Remove once featureset.InvertDiscv5 launched.
func pingClusterAB(t *testing.T, test pingTest) {
t.Helper()
t.Run("pushdisc", func(t *testing.T) {
featureset.EnableForT(t, featureset.InvertLibP2PRouting)
pingCluster(t, test)
})
t.Run("pulldisc", func(t *testing.T) {
featureset.DisableForT(t, featureset.InvertLibP2PRouting)
pingCluster(t, test)
})
}

func pingCluster(t *testing.T, test pingTest) {
t.Helper()

Expand Down
6 changes: 1 addition & 5 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,12 @@ type Feature string
const (
// QBFTConsensus introduces qbft consensus, see https://github.com/ObolNetwork/charon/issues/445.
QBFTConsensus Feature = "qbft_consensus"

// InvertLibP2PRouting enables the new push based libp2p routing and disables the old pull based.
InvertLibP2PRouting Feature = "invert_libp2p_routing"
)

var (
// state defines the current rollout status of each feature.
state = map[Feature]status{
QBFTConsensus: statusStable,
InvertLibP2PRouting: statusAlpha,
QBFTConsensus: statusStable,
// Add all features and there status here.
}

Expand Down
18 changes: 12 additions & 6 deletions cmd/bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func RunBootnode(ctx context.Context, config BootnodeConfig) error {
}
defer db.Close()

udpNode, err := p2p.NewUDPNode(config.P2PConfig, localEnode, key, nil)
udpNode, err := p2p.NewUDPNode(ctx, config.P2PConfig, localEnode, key, nil)
if err != nil {
return errors.Wrap(err, "")
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func RunBootnode(ctx context.Context, config BootnodeConfig) error {
p2pErr <- errors.Wrap(err, "new resource manager")
}

tcpNode, err := p2p.NewTCPNode(config.P2PConfig, key, p2p.NewOpenGater(), udpNode, nil, nil, libp2p.ResourceManager(mgr))
tcpNode, err := p2p.NewTCPNode(config.P2PConfig, key, p2p.NewOpenGater(), libp2p.ResourceManager(mgr))
if err != nil {
p2pErr <- errors.Wrap(err, "new tcp node")
return
Expand All @@ -172,10 +172,17 @@ func RunBootnode(ctx context.Context, config BootnodeConfig) error {
for _, conn := range conns {
peers[conn.RemotePeer()] = true
}
log.Info(ctx, "Libp2p TCP open connections", z.Int("total", len(conns)),
z.Int("peers", len(peers)))
log.Info(ctx, "Libp2p TCP open connections",
z.Int("total", len(conns)),
z.Int("peers", len(peers)),
)
}

log.Info(ctx, "Libp2p TCP relay started",
z.Str("peer_name", p2p.PeerName(tcpNode.ID())),
z.Any("p2p_tcp_addr", config.P2PConfig.TCPAddrs),
)

<-ctx.Done()
_ = tcpNode.Close()
_ = relayService.Close()
Expand All @@ -192,8 +199,7 @@ func RunBootnode(ctx context.Context, config BootnodeConfig) error {
serverErr <- server.ListenAndServe()
}()

log.Info(ctx, "Bootnode started",
z.Str("http_addr", config.HTTPAddr),
log.Info(ctx, "Discv5 UDP bootnode started",
z.Str("p2p_udp_addr", config.P2PConfig.UDPAddr),
z.Str("enr", localEnode.Node().String()),
)
Expand Down
3 changes: 2 additions & 1 deletion core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty,
if participatedShares[peer.ShareIdx()] {
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(1)
} else if unexpectedShares[peer.ShareIdx()] {
log.Warn(ctx, "Unexpected event found", nil, z.Str("peer", peer.Name), z.Str("duty", duty.String()))
// TODO(corver): Enable with https://github.com/ObolNetwork/charon/issues/993
// log.Warn(ctx, "Unexpected event found", nil, z.Str("peer", peer.Name), z.Str("duty", duty.String()))
unexpectedEventsCounter.WithLabelValues(peer.Name).Inc()
} else {
absentPeers = append(absentPeers, peer.Name)
Expand Down
14 changes: 7 additions & 7 deletions dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,30 +214,30 @@ func setupP2P(ctx context.Context, key *ecdsa.PrivateKey, p2pConf p2p.Config, pe
return nil, nil, errors.Wrap(err, "new bootnodes")
}

udpNode, err := p2p.NewUDPNode(p2pConf, localEnode, key, bootnodes)
udpNode, err := p2p.NewUDPNode(ctx, p2pConf, localEnode, key, bootnodes)
if err != nil {
return nil, nil, errors.Wrap(err, "")
}

relays, err := p2p.NewRelays(p2pConf, bootnodes)
if err != nil {
return nil, nil, err
}
relays := p2p.NewRelays(p2pConf, bootnodes)

tcpNode, err := p2p.NewTCPNode(p2pConf, key, p2p.NewOpenGater(), udpNode, peers, relays)
tcpNode, err := p2p.NewTCPNode(p2pConf, key, p2p.NewOpenGater())
if err != nil {
return nil, nil, errors.Wrap(err, "")
}

for _, relay := range relays {
go func(relay p2p.Peer) {
go func(relay *p2p.MutablePeer) {
err := p2p.NewRelayReserver(tcpNode, relay)(ctx)
if err != nil {
log.Error(ctx, "Reserve relay error", err)
}
}(relay)
}

go p2p.NewRelayRouter(tcpNode, peers, relays)(ctx)
go p2p.NewDiscoveryRouter(tcpNode, udpNode, peers)(ctx)

return tcpNode, func() {
db.Close()
udpNode.Close()
Expand Down
119 changes: 93 additions & 26 deletions p2p/bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,32 @@ import (
"github.com/obolnetwork/charon/app/z"
)

// NewUDPBootnodes returns the udp bootnodes from the config.
// NewUDPBootnodes returns the discv5 udp bootnodes from the config.
func NewUDPBootnodes(ctx context.Context, config Config, peers []Peer,
localEnode enode.ID, lockHashHex string,
) ([]*enode.Node, error) {
var resp []*enode.Node
) ([]*MutablePeer, error) {
var resp []*MutablePeer
for _, rawURL := range config.UDPBootnodes {
if strings.HasPrefix(rawURL, "http") {
// Resolve bootnode ENR via http, retry for 1min with 5sec backoff.
inner, cancel := context.WithTimeout(ctx, time.Minute)
var err error
rawURL, err = queryBootnodeENR(inner, rawURL, time.Second*5, lockHashHex)
cancel()
if err != nil {
return nil, err
}
mutable := new(MutablePeer)
go resolveBootnode(ctx, rawURL, lockHashHex, mutable.Set)
resp = append(resp, mutable)

continue
}

node, err := enode.Parse(enode.V4ID{}, rawURL)
if err != nil {
return nil, errors.Wrap(err, "invalid bootnode address")
}

resp = append(resp, node)
r := node.Record()
p, err := NewPeer(*r, -1)
if err != nil {
return nil, err
}

resp = append(resp, NewMutablePeer(p))
}

if config.UDPBootLock {
Expand All @@ -61,12 +64,65 @@ func NewUDPBootnodes(ctx context.Context, config Config, peers []Peer,
// Do not include ourselves as bootnode.
continue
}
node := p.Enode // Copy loop variable
resp = append(resp, &node)

resp = append(resp, NewMutablePeer(p))
}
}

if len(resp) == 0 {
return nil, nil
}

// Wait until at least one bootnode ENR is resolved
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
for ctx.Err() == nil {
var resolved bool
for _, node := range resp {
if _, ok := node.Peer(); ok {
resolved = true
}
}
if resolved {
return resp, nil
}
time.Sleep(time.Second * 1)
}

return resp, nil
return nil, errors.Wrap(ctx.Err(), "timeout resolving bootnode ENR")
}

// resolveBootnode continuously resolves the bootnode ENR from the HTTP url and returns
// the new bootnode ENR/Peer when it changes via the callback.
func resolveBootnode(ctx context.Context, rawURL, lockHashHex string, callback func(Peer)) {
var prevENR string
for ctx.Err() == nil {
node, err := queryBootnodeENR(ctx, rawURL, time.Second*5, lockHashHex)
if err != nil {
log.Error(ctx, "Failed resolving bootnode ENR from URL", err, z.Str("url", rawURL))
return
}

newENR := node.String()
if prevENR != newENR {
prevENR = newENR

r := node.Record()
p, err := NewPeer(*r, -1)
if err != nil {
log.Error(ctx, "Failed to create bootnode peer", err)
} else {
log.Info(ctx, "Resolved new bootnode ENR",
z.Str("peer", p.Name),
z.Str("url", rawURL),
z.Str("enr", newENR),
)
callback(p)
}
}

time.Sleep(time.Minute * 2) // Wait 2min before checking again.
}
}

// queryBootnodeENR returns the bootnode ENR via a http GET query to the url.
Expand All @@ -75,43 +131,54 @@ func NewUDPBootnodes(ctx context.Context, config Config, peers []Peer,
// when bootnodes are deployed in docker-compose or kubernetes
//
// It retries until the context is cancelled.
func queryBootnodeENR(ctx context.Context, bootnodeURL string, backoff time.Duration, lockHashHex string) (string, error) {
func queryBootnodeENR(ctx context.Context, bootnodeURL string, backoff time.Duration, lockHashHex string) (*enode.Node, error) {
parsedURL, err := url.Parse(bootnodeURL)
if err != nil {
return "", errors.Wrap(err, "parse bootnode url")
return nil, errors.Wrap(err, "parse bootnode url")
} else if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return "", errors.New("invalid bootnode url")
return nil, errors.New("invalid bootnode url")
}

var client http.Client
for ctx.Err() == nil {
req, err := http.NewRequestWithContext(ctx, "GET", bootnodeURL, nil)
if err != nil {
return "", errors.Wrap(err, "new request")
return nil, errors.Wrap(err, "new request")
}
req.Header.Set("Charon-Cluster", lockHashHex)

resp, err := client.Do(req)
if err != nil {
log.Warn(ctx, "Failure querying bootnode ENR, trying again in 5s...", err)
log.Warn(ctx, "Failure querying bootnode ENR (will try again)", err)
time.Sleep(backoff)

continue
} else if resp.StatusCode/100 != 2 {
return "", errors.New("non-200 response querying bootnode ENR",
z.Int("status_code", resp.StatusCode))
log.Warn(ctx, "Non-200 response querying bootnode ENR (will try again)", nil, z.Int("status_code", resp.StatusCode))
time.Sleep(backoff)

continue
}

b, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return "", errors.Wrap(err, "read response body")
log.Warn(ctx, "Failure reading bootnode ENR (will try again)", err)
time.Sleep(backoff)

continue
}

log.Info(ctx, "Queried bootnode ENR", z.Str("url", bootnodeURL), z.Str("enr", string(b)))
node, err := enode.Parse(enode.V4ID{}, string(b))
if err != nil {
log.Warn(ctx, "Failure parsing ENR (will try again)", err, z.Str("enr", string(b)))
time.Sleep(backoff)

continue
}

return string(b), nil
return node, nil
}

return "", errors.Wrap(ctx.Err(), "timeout querying bootnode ENR")
return nil, errors.Wrap(ctx.Err(), "timeout querying bootnode ENR")
}
Loading

0 comments on commit 10c10fc

Please sign in to comment.