Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,13 +521,14 @@ func (t *dialTask) resolve(d *dialScheduler) bool {

// dial performs the actual connection attempt.
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
dialMeter.Mark(1)
fd, err := d.dialer.Dial(d.ctx, t.dest)
if err != nil {
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
dialConnectionError.Mark(1)
return &dialError{err}
}
mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
return d.setupFunc(mfd, t.flags, dest)
return d.setupFunc(newMeteredConn(fd), t.flags, dest)
}

func (t *dialTask) String() string {
Expand Down
8 changes: 8 additions & 0 deletions p2p/discover/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package discover

import (
"fmt"
"net"

"github.com/CortexFoundation/CortexTheseus/metrics"
Expand All @@ -32,10 +33,17 @@ const (
)

var (
bucketsCounter []metrics.Counter
ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil)
egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil)
)

func init() {
for i := 0; i < nBuckets; i++ {
bucketsCounter = append(bucketsCounter, metrics.NewRegisteredCounter(fmt.Sprintf("%s/bucket/%d/count", moduleName, i), nil))
}
}

// meteredConn is a wrapper around a net.UDPConn that meters both the
// inbound and outbound network traffic.
type meteredUdpConn struct {
Expand Down
44 changes: 39 additions & 5 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/CortexTheseus/metrics"
"github.com/CortexFoundation/CortexTheseus/p2p/enode"
"github.com/CortexFoundation/CortexTheseus/p2p/netutil"
)
Expand Down Expand Up @@ -80,7 +81,8 @@ type Table struct {
closeReq chan struct{}
closed chan struct{}

nodeAddedHook func(*node) // for testing
nodeAddedHook func(*bucket, *node)
nodeRemovedHook func(*bucket, *node)
}

// transport is implemented by the UDP transports.
Expand All @@ -98,6 +100,7 @@ type bucket struct {
entries []*node // live entries, sorted by time of last contact
replacements []*node // recently seen nodes to be used if revalidation fails
ips netutil.DistinctNetSet
index int
}

func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
Expand All @@ -119,7 +122,8 @@ func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
index: i,
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
tab.seedRand()
Expand All @@ -128,6 +132,22 @@ func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
return tab, nil
}

func newMeteredTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
tab, err := newTable(t, db, cfg)
if err != nil {
return nil, err
}
if metrics.Enabled {
tab.nodeAddedHook = func(b *bucket, n *node) {
bucketsCounter[b.index].Inc(1)
}
tab.nodeRemovedHook = func(b *bucket, n *node) {
bucketsCounter[b.index].Dec(1)
}
}
return tab, nil
}

// Nodes returns all nodes contained in the table.
func (tab *Table) Nodes() []*enode.Node {
if !tab.isInitDone() {
Expand Down Expand Up @@ -183,12 +203,18 @@ func (tab *Table) close() {
// are used to connect to the network if the table is empty and there
// are no known nodes in the database.
func (tab *Table) setFallbackNodes(nodes []*enode.Node) error {
nursery := make([]*node, 0, len(nodes))
for _, n := range nodes {
if err := n.ValidateComplete(); err != nil {
return fmt.Errorf("bad bootstrap node %q: %v", n, err)
}
if tab.cfg.NetRestrict != nil && !tab.cfg.NetRestrict.Contains(n.IP()) {
tab.log.Error("Bootstrap node filtered by netrestrict", "id", n.ID(), "ip", n.IP())
continue
}
nursery = append(nursery, wrapNode(n))
}
tab.nursery = wrapNodes(nodes)
tab.nursery = nursery
return nil
}

Expand Down Expand Up @@ -495,7 +521,7 @@ func (tab *Table) addSeenNode(n *node) {
n.addedAt = time.Now()

if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
tab.nodeAddedHook(b, n)
}
}

Expand Down Expand Up @@ -539,7 +565,7 @@ func (tab *Table) addVerifiedNode(n *node) {
n.addedAt = time.Now()

if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
tab.nodeAddedHook(b, n)
}
}

Expand Down Expand Up @@ -638,8 +664,16 @@ func (tab *Table) bumpInBucket(b *bucket, n *node) bool {
}

func (tab *Table) deleteInBucket(b *bucket, n *node) {
// Check if the node is actually in the bucket so the removed hook
// isn't called multiple times for the same node.
if !contains(b.entries, n.ID()) {
return
}
b.entries = deleteNode(b.entries, n)
tab.removeIP(b, n.IP())
if tab.nodeRemovedHook != nil {
tab.nodeRemovedHook(b, n)
}
}

func contains(ns []*node, id enode.ID) bool {
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
log: cfg.Log,
}

tab, err := newTable(t, ln.Database(), cfg)
tab, err := newMeteredTable(t, ln.Database(), cfg)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions p2p/discover/v4_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func TestUDPv4_responseTimeouts(t *testing.T) {
test := newUDPTest(t)
defer test.close()

rand.Seed(time.Now().UnixNano())
randomDuration := func(max time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(max)))
}
Expand Down Expand Up @@ -395,7 +394,7 @@ func TestUDPv4_pingMatchIP(t *testing.T) {
func TestUDPv4_successfulPing(t *testing.T) {
test := newUDPTest(t)
added := make(chan *node, 1)
test.table.nodeAddedHook = func(n *node) { added <- n }
test.table.nodeAddedHook = func(b *bucket, n *node) { added <- n }
defer test.close()

// The remote side sends a ping packet to initiate the exchange.
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
cancelCloseCtx: cancelCloseCtx,
}
t.talk = newTalkSystem(t)
tab, err := newTable(t, t.db, cfg)
tab, err := newMeteredTable(t, t.db, cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (t *UDPv5) TalkRequest(n *enode.Node, protocol string, request []byte) ([]b
}
}

// TalkRequest sends a talk request to a node and waits for a response.
// TalkRequestToID sends a talk request to a node and waits for a response.
func (t *UDPv5) TalkRequestToID(id enode.ID, addr *net.UDPAddr, protocol string, request []byte) ([]byte, error) {
req := &v5wire.TalkRequest{Protocol: protocol, Message: request}
resp := t.callToID(id, addr, v5wire.TalkResponseMsg, req)
Expand Down
37 changes: 37 additions & 0 deletions p2p/discover/v5_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/CortexFoundation/CortexTheseus/p2p/enode"
"github.com/CortexFoundation/CortexTheseus/p2p/enr"
"github.com/CortexFoundation/CortexTheseus/rlp"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -537,6 +538,42 @@ func TestUDPv5_talkRequest(t *testing.T) {
}
}

// This test checks that lookupDistances works.
func TestUDPv5_lookupDistances(t *testing.T) {
test := newUDPV5Test(t)
lnID := test.table.self().ID()

t.Run("target distance of 1", func(t *testing.T) {
node := nodeAtDistance(lnID, 1, intIP(0))
dists := lookupDistances(lnID, node.ID())
require.Equal(t, []uint{1, 2, 3}, dists)
})

t.Run("target distance of 2", func(t *testing.T) {
node := nodeAtDistance(lnID, 2, intIP(0))
dists := lookupDistances(lnID, node.ID())
require.Equal(t, []uint{2, 3, 1}, dists)
})

t.Run("target distance of 128", func(t *testing.T) {
node := nodeAtDistance(lnID, 128, intIP(0))
dists := lookupDistances(lnID, node.ID())
require.Equal(t, []uint{128, 129, 127}, dists)
})

t.Run("target distance of 255", func(t *testing.T) {
node := nodeAtDistance(lnID, 255, intIP(0))
dists := lookupDistances(lnID, node.ID())
require.Equal(t, []uint{255, 256, 254}, dists)
})

t.Run("target distance of 256", func(t *testing.T) {
node := nodeAtDistance(lnID, 256, intIP(0))
dists := lookupDistances(lnID, node.ID())
require.Equal(t, []uint{256, 255, 254}, dists)
})
}

// This test checks that lookup works.
func TestUDPv5_lookup(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/v5wire/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type (
handshakeAuthData struct {
h struct {
SrcID enode.ID
SigSize byte // ignature data
SigSize byte // signature data
PubkeySize byte // offset of
}
// Trailing variable-size data.
Expand Down Expand Up @@ -548,7 +548,7 @@ func (c *Codec) decodeHandshake(fromAddr string, head *Header) (n *enode.Node, a
if err != nil {
return nil, auth, nil, errInvalidAuthKey
}
// Derive sesssion keys.
// Derive session keys.
session := deriveKeys(sha256.New, c.privkey, ephkey, auth.h.SrcID, c.localnode.ID(), cdata)
session = session.keysFlipped()
return n, auth, session, nil
Expand Down
2 changes: 1 addition & 1 deletion p2p/enode/localnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package enode

import (
"math/rand"
"crypto/rand"
"net"
"testing"

Expand Down
Loading