Skip to content

Commit

Permalink
p2p: refactor and cleanup bootnode relay wiring (#428)
Browse files Browse the repository at this point in the history
- Decouple relays and bootnodes from udpnode.
- Improve gater tests

category: refactor 
ticket: #413
  • Loading branch information
corverroos committed Apr 18, 2022
1 parent 40d1097 commit db08fa3
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 261 deletions.
32 changes: 20 additions & 12 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,34 +169,38 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, manifest
var err error
p2pKey, err = p2p.LoadPrivKey(conf.DataDir)
if err != nil {
return nil, nil, errors.Wrap(err, "load p2p key")
return nil, nil, err
}
}

localEnode, peerDB, err := p2p.NewLocalEnode(conf.P2P, p2pKey)
if err != nil {
return nil, nil, errors.Wrap(err, "create local enode")
return nil, nil, err
}

udpNode, err := p2p.NewUDPNode(ctx, conf.P2P, localEnode, p2pKey, manifest.Peers)
bootnodes, err := p2p.NewUDPBootnodes(ctx, conf.P2P, manifest.Peers, localEnode.ID())
if err != nil {
return nil, nil, errors.Wrap(err, "start discv5 listener")
return nil, nil, err
}

connGater, err := p2p.NewConnGater(manifest.PeerIDs(), udpNode.Relays)
udpNode, err := p2p.NewUDPNode(conf.P2P, localEnode, p2pKey, bootnodes)
if err != nil {
return nil, nil, errors.Wrap(err, "connection gater")
return nil, nil, err
}

tcpNode, err := p2p.NewTCPNode(conf.P2P, p2pKey, connGater, udpNode, manifest.Peers, p2p.EmptyAdvertisedAddrs)
relays, err := p2p.NewRelays(conf.P2P, bootnodes)
if err != nil {
return nil, nil, errors.Wrap(err, "new p2p node", z.Str("allowlist", conf.P2P.Allowlist))
return nil, nil, err
}

if conf.P2P.BootnodeRelay {
for _, relay := range udpNode.Relays {
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartRelay, p2p.NewRelayReserver(tcpNode, relay))
}
connGater, err := p2p.NewConnGater(manifest.PeerIDs(), relays)
if err != nil {
return nil, nil, err
}

tcpNode, err := p2p.NewTCPNode(conf.P2P, p2pKey, connGater, udpNode, manifest.Peers, relays)
if err != nil {
return nil, nil, err
}

if !conf.TestConfig.DisablePing {
Expand All @@ -209,6 +213,10 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, manifest
life.RegisterStop(lifecycle.StopP2PTCPNode, lifecycle.HookFuncErr(tcpNode.Close))
life.RegisterStop(lifecycle.StopP2PUDPNode, lifecycle.HookFuncMin(udpNode.Close))

for _, relay := range relays {
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartRelay, p2p.NewRelayReserver(tcpNode, relay))
}

return tcpNode, localEnode, nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func RunBootnode(ctx context.Context, config BootnodeConfig) error {
}
defer db.Close()

udpNode, err := p2p.NewUDPNode(ctx, config.P2PConfig, localEnode, key, nil)
udpNode, err := p2p.NewUDPNode(config.P2PConfig, localEnode, key, nil)
if err != nil {
return errors.Wrap(err, "")
}
Expand All @@ -113,7 +113,7 @@ func RunBootnode(ctx context.Context, config BootnodeConfig) error {
p2pErr := make(chan error, 1)
go func() {
if config.P2PRelay {
tcpNode, err := p2p.NewTCPNode(config.P2PConfig, key, p2p.NewOpenGater(), udpNode, nil, p2p.EmptyAdvertisedAddrs)
tcpNode, err := p2p.NewTCPNode(config.P2PConfig, key, p2p.NewOpenGater(), udpNode, nil, nil)
if err != nil {
p2pErr <- err
return
Expand Down
116 changes: 116 additions & 0 deletions p2p/bootnode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package p2p

import (
"context"
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)

// NewUDPBootnodes returns the udp bootnodes from the config.
func NewUDPBootnodes(ctx context.Context, config Config, peers []Peer,
localEnode enode.ID,
) ([]*enode.Node, error) {
var resp []*enode.Node
for _, rawURL := range config.UDPBootnodes {
if strings.HasPrefix(rawURL, "http") {
// Resolve bootnode ENR via http, retry for 1min with 5sec backoff.
inner, cancel := context.WithTimeout(ctx, time.Minute)
var err error
rawURL, err = queryBootnodeENR(inner, rawURL, time.Second*5)
cancel()
if err != nil {
return nil, err
}
}

node, err := enode.Parse(enode.V4ID{}, rawURL)
if err != nil {
return nil, errors.Wrap(err, "invalid bootnode address")
}

resp = append(resp, node)
}

if config.UDPBootManifest {
for _, p := range peers {
if p.Enode.ID() == localEnode {
// Do not include ourselves as bootnode.
continue
}
node := p.Enode // Copy loop variable
resp = append(resp, &node)
}
}

return resp, nil
}

// queryBootnodeENR returns the bootnode ENR via a http GET query to the url.
//
// This supports resolving bootnode ENR from known http URLs which is handy
// when bootnodes are deployed in docker-compose or kubernetes
//
// It retries until the context is cancelled.
func queryBootnodeENR(ctx context.Context, bootnodeURL string, backoff time.Duration) (string, error) {
parsedURL, err := url.Parse(bootnodeURL)
if err != nil {
return "", errors.Wrap(err, "parse bootnode url")
} else if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return "", errors.New("invalid bootnode url")
}

var client http.Client
for ctx.Err() == nil {
req, err := http.NewRequestWithContext(ctx, "GET", bootnodeURL, nil)
if err != nil {
return "", errors.Wrap(err, "new request")
}

resp, err := client.Do(req)
if err != nil {
log.Warn(ctx, "Failure querying bootnode ENR, trying again in 5s...", z.Err(err))
time.Sleep(backoff)

continue
} else if resp.StatusCode/100 != 2 {
return "", errors.New("non-200 response querying bootnode ENR",
z.Int("status_code", resp.StatusCode))
}

b, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return "", errors.Wrap(err, "read response body")
}

log.Info(ctx, "Queried bootnode ENR", z.Str("url", bootnodeURL), z.Str("enr", string(b)))

return string(b), nil
}

return "", errors.Wrap(ctx.Err(), "timeout querying bootnode ENR")
}
113 changes: 8 additions & 105 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,15 @@
package p2p

import (
"context"
"crypto/ecdsa"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/netutil"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)

// UDPNode wraps a discv5 udp node and adds the bootnodes relays.
Expand All @@ -42,63 +34,23 @@ type UDPNode struct {
}

// NewUDPNode starts and returns a discv5 UDP implementation.
func NewUDPNode(ctx context.Context, config Config, ln *enode.LocalNode, key *ecdsa.PrivateKey,
peers []Peer,
) (UDPNode, error) {
// Setup bootnodes and relays
var (
bootnodes []*enode.Node
bootRelays []Peer
err error
)
for _, bootnode := range config.UDPBootnodes {
if strings.HasPrefix(bootnode, "http") {
// Query bootnode ENR via http, retry for 1min with 5sec backoff.
inner, cancel := context.WithTimeout(ctx, time.Minute)
bootnode, err = queryBootnodeENR(inner, bootnode, time.Second*5)
cancel()
if err != nil {
return UDPNode{}, err
}
}

peer, err := newRelayPeer(bootnode)
if err != nil {
return UDPNode{}, err
}

bootnodes = append(bootnodes, &peer.Enode)

if config.BootnodeRelay {
bootRelays = append(bootRelays, peer)
}
}

if config.UDPBootManifest {
for _, p := range peers {
if ln.ID() == p.Enode.ID() {
// Do not add local node as bootnode
continue
}
node := p.Enode // Copy loop variable
bootnodes = append(bootnodes, &node)
}
}

func NewUDPNode(config Config, ln *enode.LocalNode,
key *ecdsa.PrivateKey, bootnodes []*enode.Node,
) (*discover.UDPv5, error) {
// Setup discv5 udp listener.
udpAddr, err := net.ResolveUDPAddr("udp", config.UDPAddr)
if err != nil {
return UDPNode{}, errors.Wrap(err, "resolve udp address")
return nil, errors.Wrap(err, "resolve udp address")
}

conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return UDPNode{}, errors.Wrap(err, "parse udp address")
return nil, errors.Wrap(err, "parse udp address")
}

netlist, err := netutil.ParseNetlist(config.Allowlist)
if err != nil {
return UDPNode{}, errors.Wrap(err, "parse allow list")
return nil, errors.Wrap(err, "parse allow list")
}

node, err := discover.ListenV5(conn, ln, discover.Config{
Expand All @@ -107,59 +59,10 @@ func NewUDPNode(ctx context.Context, config Config, ln *enode.LocalNode, key *ec
Bootnodes: bootnodes,
})
if err != nil {
return UDPNode{}, errors.Wrap(err, "discv5 listen")
}

return UDPNode{
UDPv5: node,
Relays: bootRelays,
}, nil
}

// queryBootnodeENR returns the bootnode ENR via a http GET query to the url.
//
// This supports resolving bootnode ENR from known http URLs which is handy
// when bootnodes are deployed in docker-compose or kubernetes
//
// It retries until the context is cancelled.
func queryBootnodeENR(ctx context.Context, bootnodeURL string, backoff time.Duration) (string, error) {
parsedURL, err := url.Parse(bootnodeURL)
if err != nil {
return "", errors.Wrap(err, "parse bootnode url")
} else if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return "", errors.New("invalid bootnode url")
}

var client http.Client
for ctx.Err() == nil {
req, err := http.NewRequestWithContext(ctx, "GET", bootnodeURL, nil)
if err != nil {
return "", errors.Wrap(err, "new request")
}

resp, err := client.Do(req)
if err != nil {
log.Warn(ctx, "Failure querying bootnode ENR, trying again in 5s...", z.Err(err))
time.Sleep(backoff)

continue
} else if resp.StatusCode/100 != 2 {
return "", errors.New("non-200 response querying bootnode ENR",
z.Int("status_code", resp.StatusCode))
}

b, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return "", errors.Wrap(err, "read response body")
}

log.Info(ctx, "Queried bootnode ENR", z.Str("url", bootnodeURL), z.Str("enr", string(b)))

return string(b), nil
return nil, errors.Wrap(err, "discv5 listen")
}

return "", errors.Wrap(ctx.Err(), "timeout querying bootnode ENR")
return node, nil
}

// NewLocalEnode returns a local enode and a peer DB or an error.
Expand Down
4 changes: 1 addition & 3 deletions p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package p2p_test

import (
"context"
"crypto/ecdsa"
"fmt"
"math/rand"
Expand All @@ -30,7 +29,6 @@ import (
)

func TestExternalHost(t *testing.T) {
ctx := context.Background()
p2pKey, err := ecdsa.GenerateKey(crypto.S256(), rand.New(rand.NewSource(0)))
require.NoError(t, err)

Expand All @@ -47,7 +45,7 @@ func TestExternalHost(t *testing.T) {
require.NoError(t, err)
defer db.Close()

udpNode, err := p2p.NewUDPNode(ctx, config, localNode, p2pKey, nil)
udpNode, err := p2p.NewUDPNode(config, localNode, p2pKey, nil)
testutil.SkipIfBindErr(t, err)
require.NoError(t, err)
defer udpNode.Close()
Expand Down
4 changes: 2 additions & 2 deletions p2p/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func NewConnGater(peers []peer.ID, relays []Peer) (ConnGater, error) {
}

// Allow connections to/from relays.
for _, bootnode := range relays {
peerMap[bootnode.ID] = true
for _, relay := range relays {
peerMap[relay.ID] = true
}

return ConnGater{
Expand Down
Loading

0 comments on commit db08fa3

Please sign in to comment.