diff --git a/p2p/dial.go b/p2p/dial.go index c8b537cbf0..59d9566dec 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -521,13 +521,14 @@ func (t *dialTask) resolve(d *dialScheduler) bool { // dial performs the actual connection attempt. func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error { + dialMeter.Mark(1) fd, err := d.dialer.Dial(d.ctx, t.dest) if err != nil { d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err)) + dialConnectionError.Mark(1) return &dialError{err} } - mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}) - return d.setupFunc(mfd, t.flags, dest) + return d.setupFunc(newMeteredConn(fd), t.flags, dest) } func (t *dialTask) String() string { diff --git a/p2p/discover/metrics.go b/p2p/discover/metrics.go index 15544791fd..4b1c39d36a 100644 --- a/p2p/discover/metrics.go +++ b/p2p/discover/metrics.go @@ -17,6 +17,7 @@ package discover import ( + "fmt" "net" "github.com/CortexFoundation/CortexTheseus/metrics" @@ -32,10 +33,17 @@ const ( ) var ( + bucketsCounter []metrics.Counter ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil) egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil) ) +func init() { + for i := 0; i < nBuckets; i++ { + bucketsCounter = append(bucketsCounter, metrics.NewRegisteredCounter(fmt.Sprintf("%s/bucket/%d/count", moduleName, i), nil)) + } +} + // meteredConn is a wrapper around a net.UDPConn that meters both the // inbound and outbound network traffic. type meteredUdpConn struct { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 26f6599ac0..9ed35b1eed 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -34,6 +34,7 @@ import ( "github.com/CortexFoundation/CortexTheseus/common" "github.com/CortexFoundation/CortexTheseus/log" + "github.com/CortexFoundation/CortexTheseus/metrics" "github.com/CortexFoundation/CortexTheseus/p2p/enode" "github.com/CortexFoundation/CortexTheseus/p2p/netutil" ) @@ -80,7 +81,8 @@ type Table struct { closeReq chan struct{} closed chan struct{} - nodeAddedHook func(*node) // for testing + nodeAddedHook func(*bucket, *node) + nodeRemovedHook func(*bucket, *node) } // transport is implemented by the UDP transports. @@ -98,6 +100,7 @@ type bucket struct { entries []*node // live entries, sorted by time of last contact replacements []*node // recently seen nodes to be used if revalidation fails ips netutil.DistinctNetSet + index int } func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { @@ -119,7 +122,8 @@ func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { } for i := range tab.buckets { tab.buckets[i] = &bucket{ - ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, + index: i, + ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, } } tab.seedRand() @@ -128,6 +132,22 @@ func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { return tab, nil } +func newMeteredTable(t transport, db *enode.DB, cfg Config) (*Table, error) { + tab, err := newTable(t, db, cfg) + if err != nil { + return nil, err + } + if metrics.Enabled { + tab.nodeAddedHook = func(b *bucket, n *node) { + bucketsCounter[b.index].Inc(1) + } + tab.nodeRemovedHook = func(b *bucket, n *node) { + bucketsCounter[b.index].Dec(1) + } + } + return tab, nil +} + // Nodes returns all nodes contained in the table. func (tab *Table) Nodes() []*enode.Node { if !tab.isInitDone() { @@ -183,12 +203,18 @@ func (tab *Table) close() { // are used to connect to the network if the table is empty and there // are no known nodes in the database. func (tab *Table) setFallbackNodes(nodes []*enode.Node) error { + nursery := make([]*node, 0, len(nodes)) for _, n := range nodes { if err := n.ValidateComplete(); err != nil { return fmt.Errorf("bad bootstrap node %q: %v", n, err) } + if tab.cfg.NetRestrict != nil && !tab.cfg.NetRestrict.Contains(n.IP()) { + tab.log.Error("Bootstrap node filtered by netrestrict", "id", n.ID(), "ip", n.IP()) + continue + } + nursery = append(nursery, wrapNode(n)) } - tab.nursery = wrapNodes(nodes) + tab.nursery = nursery return nil } @@ -495,7 +521,7 @@ func (tab *Table) addSeenNode(n *node) { n.addedAt = time.Now() if tab.nodeAddedHook != nil { - tab.nodeAddedHook(n) + tab.nodeAddedHook(b, n) } } @@ -539,7 +565,7 @@ func (tab *Table) addVerifiedNode(n *node) { n.addedAt = time.Now() if tab.nodeAddedHook != nil { - tab.nodeAddedHook(n) + tab.nodeAddedHook(b, n) } } @@ -638,8 +664,16 @@ func (tab *Table) bumpInBucket(b *bucket, n *node) bool { } func (tab *Table) deleteInBucket(b *bucket, n *node) { + // Check if the node is actually in the bucket so the removed hook + // isn't called multiple times for the same node. + if !contains(b.entries, n.ID()) { + return + } b.entries = deleteNode(b.entries, n) tab.removeIP(b, n.IP()) + if tab.nodeRemovedHook != nil { + tab.nodeRemovedHook(b, n) + } } func contains(ns []*node, id enode.ID) bool { diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index f50a887649..c4237e83c7 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { log: cfg.Log, } - tab, err := newTable(t, ln.Database(), cfg) + tab, err := newMeteredTable(t, ln.Database(), cfg) if err != nil { return nil, err } diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 0cfc13a888..72bc26d497 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -165,7 +165,6 @@ func TestUDPv4_responseTimeouts(t *testing.T) { test := newUDPTest(t) defer test.close() - rand.Seed(time.Now().UnixNano()) randomDuration := func(max time.Duration) time.Duration { return time.Duration(rand.Int63n(int64(max))) } @@ -395,7 +394,7 @@ func TestUDPv4_pingMatchIP(t *testing.T) { func TestUDPv4_successfulPing(t *testing.T) { test := newUDPTest(t) added := make(chan *node, 1) - test.table.nodeAddedHook = func(n *node) { added <- n } + test.table.nodeAddedHook = func(b *bucket, n *node) { added <- n } defer test.close() // The remote side sends a ping packet to initiate the exchange. diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index a896a9ceeb..88a6c4b111 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -174,7 +174,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { cancelCloseCtx: cancelCloseCtx, } t.talk = newTalkSystem(t) - tab, err := newTable(t, t.db, cfg) + tab, err := newMeteredTable(t, t.db, cfg) if err != nil { return nil, err } @@ -264,7 +264,7 @@ func (t *UDPv5) TalkRequest(n *enode.Node, protocol string, request []byte) ([]b } } -// TalkRequest sends a talk request to a node and waits for a response. +// TalkRequestToID sends a talk request to a node and waits for a response. func (t *UDPv5) TalkRequestToID(id enode.ID, addr *net.UDPAddr, protocol string, request []byte) ([]byte, error) { req := &v5wire.TalkRequest{Protocol: protocol, Message: request} resp := t.callToID(id, addr, v5wire.TalkResponseMsg, req) diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index a23327fe0b..9ff446f713 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -33,6 +33,7 @@ import ( "github.com/CortexFoundation/CortexTheseus/p2p/enode" "github.com/CortexFoundation/CortexTheseus/p2p/enr" "github.com/CortexFoundation/CortexTheseus/rlp" + "github.com/stretchr/testify/require" "golang.org/x/exp/slices" ) @@ -537,6 +538,42 @@ func TestUDPv5_talkRequest(t *testing.T) { } } +// This test checks that lookupDistances works. +func TestUDPv5_lookupDistances(t *testing.T) { + test := newUDPV5Test(t) + lnID := test.table.self().ID() + + t.Run("target distance of 1", func(t *testing.T) { + node := nodeAtDistance(lnID, 1, intIP(0)) + dists := lookupDistances(lnID, node.ID()) + require.Equal(t, []uint{1, 2, 3}, dists) + }) + + t.Run("target distance of 2", func(t *testing.T) { + node := nodeAtDistance(lnID, 2, intIP(0)) + dists := lookupDistances(lnID, node.ID()) + require.Equal(t, []uint{2, 3, 1}, dists) + }) + + t.Run("target distance of 128", func(t *testing.T) { + node := nodeAtDistance(lnID, 128, intIP(0)) + dists := lookupDistances(lnID, node.ID()) + require.Equal(t, []uint{128, 129, 127}, dists) + }) + + t.Run("target distance of 255", func(t *testing.T) { + node := nodeAtDistance(lnID, 255, intIP(0)) + dists := lookupDistances(lnID, node.ID()) + require.Equal(t, []uint{255, 256, 254}, dists) + }) + + t.Run("target distance of 256", func(t *testing.T) { + node := nodeAtDistance(lnID, 256, intIP(0)) + dists := lookupDistances(lnID, node.ID()) + require.Equal(t, []uint{256, 255, 254}, dists) + }) +} + // This test checks that lookup works. func TestUDPv5_lookup(t *testing.T) { t.Parallel() diff --git a/p2p/discover/v5wire/encoding.go b/p2p/discover/v5wire/encoding.go index fdcea93e08..e1821bcd09 100644 --- a/p2p/discover/v5wire/encoding.go +++ b/p2p/discover/v5wire/encoding.go @@ -65,7 +65,7 @@ type ( handshakeAuthData struct { h struct { SrcID enode.ID - SigSize byte // ignature data + SigSize byte // signature data PubkeySize byte // offset of } // Trailing variable-size data. @@ -548,7 +548,7 @@ func (c *Codec) decodeHandshake(fromAddr string, head *Header) (n *enode.Node, a if err != nil { return nil, auth, nil, errInvalidAuthKey } - // Derive sesssion keys. + // Derive session keys. session := deriveKeys(sha256.New, c.privkey, ephkey, auth.h.SrcID, c.localnode.ID(), cdata) session = session.keysFlipped() return n, auth, session, nil diff --git a/p2p/enode/localnode_test.go b/p2p/enode/localnode_test.go index 99e12ed0ef..77307cb6bd 100644 --- a/p2p/enode/localnode_test.go +++ b/p2p/enode/localnode_test.go @@ -17,7 +17,7 @@ package enode import ( - "math/rand" + "crypto/rand" "net" "testing" diff --git a/p2p/metrics.go b/p2p/metrics.go index 8667df0703..7e0db8b808 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -19,30 +19,86 @@ package p2p import ( + "errors" "net" "github.com/CortexFoundation/CortexTheseus/metrics" ) const ( + // HandleHistName is the prefix of the per-packet serving time histograms. + HandleHistName = "p2p/handle" + // ingressMeterName is the prefix of the per-packet inbound metrics. ingressMeterName = "p2p/ingress" // egressMeterName is the prefix of the per-packet outbound metrics. egressMeterName = "p2p/egress" - - // HandleHistName is the prefix of the per-packet serving time histograms. - HandleHistName = "p2p/handle" ) var ( - ingressConnectMeter = metrics.NewRegisteredMeter("p2p/serves", nil) - ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil) - egressConnectMeter = metrics.NewRegisteredMeter("p2p/dials", nil) - egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil) - activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) + activePeerGauge metrics.Gauge = metrics.NilGauge{} + + ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/ingress", nil) + egressTrafficMeter = metrics.NewRegisteredMeter("p2p/egress", nil) + + // general ingress/egress connection meters + serveMeter metrics.Meter = metrics.NilMeter{} + serveSuccessMeter metrics.Meter = metrics.NilMeter{} + dialMeter metrics.Meter = metrics.NilMeter{} + dialSuccessMeter metrics.Meter = metrics.NilMeter{} + dialConnectionError metrics.Meter = metrics.NilMeter{} + + // handshake error meters + dialTooManyPeers = metrics.NewRegisteredMeter("p2p/dials/error/saturated", nil) + dialAlreadyConnected = metrics.NewRegisteredMeter("p2p/dials/error/known", nil) + dialSelf = metrics.NewRegisteredMeter("p2p/dials/error/self", nil) + dialUselessPeer = metrics.NewRegisteredMeter("p2p/dials/error/useless", nil) + dialUnexpectedIdentity = metrics.NewRegisteredMeter("p2p/dials/error/id/unexpected", nil) + dialEncHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/enc", nil) + dialProtoHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/proto", nil) ) +func init() { + if !metrics.Enabled { + return + } + + activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) + serveMeter = metrics.NewRegisteredMeter("p2p/serves", nil) + serveSuccessMeter = metrics.NewRegisteredMeter("p2p/serves/success", nil) + dialMeter = metrics.NewRegisteredMeter("p2p/dials", nil) + dialSuccessMeter = metrics.NewRegisteredMeter("p2p/dials/success", nil) + dialConnectionError = metrics.NewRegisteredMeter("p2p/dials/error/connection", nil) +} + +// markDialError matches errors that occur while setting up a dial connection +// to the corresponding meter. +func markDialError(err error) { + if !metrics.Enabled { + return + } + if err2 := errors.Unwrap(err); err2 != nil { + err = err2 + } + switch err { + case DiscTooManyPeers: + dialTooManyPeers.Mark(1) + case DiscAlreadyConnected: + dialAlreadyConnected.Mark(1) + case DiscSelf: + dialSelf.Mark(1) + case DiscUselessPeer: + dialUselessPeer.Mark(1) + case DiscUnexpectedIdentity: + dialUnexpectedIdentity.Mark(1) + case errEncHandshakeError: + dialEncHandshakeError.Mark(1) + case errProtoHandshakeError: + dialProtoHandshakeError.Mark(1) + } +} + // meteredConn is a wrapper around a net.Conn that meters both the // inbound and outbound network traffic. type meteredConn struct { @@ -52,18 +108,10 @@ type meteredConn struct { // newMeteredConn creates a new metered connection, bumps the ingress or egress // connection meter and also increases the metered peer count. If the metrics // system is disabled, function returns the original connection. -func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn { - // Short circuit if metrics are disabled +func newMeteredConn(conn net.Conn) net.Conn { if !metrics.Enabled { return conn } - // Bump the connection counters and wrap the connection - if ingress { - ingressConnectMeter.Mark(1) - } else { - egressConnectMeter.Mark(1) - } - activePeerGauge.Inc(1) return &meteredConn{Conn: conn} } @@ -82,13 +130,3 @@ func (c *meteredConn) Write(b []byte) (n int, err error) { egressTrafficMeter.Mark(int64(n)) return n, err } - -// Close delegates a close operation to the underlying connection, unregisters -// the peer from the traffic registries and emits close event. -func (c *meteredConn) Close() error { - err := c.Conn.Close() - if err == nil { - activePeerGauge.Dec(1) - } - return err -} diff --git a/p2p/netutil/iptrack_test.go b/p2p/netutil/iptrack_test.go index f748727445..d6c1a87b0f 100644 --- a/p2p/netutil/iptrack_test.go +++ b/p2p/netutil/iptrack_test.go @@ -17,8 +17,8 @@ package netutil import ( + crand "crypto/rand" "fmt" - mrand "math/rand" "testing" "time" @@ -123,8 +123,8 @@ func TestIPTrackerForceGC(t *testing.T) { for i := 0; i < 5*max; i++ { e1 := make([]byte, 4) e2 := make([]byte, 4) - mrand.Read(e1) - mrand.Read(e2) + crand.Read(e1) + crand.Read(e2) it.AddStatement(string(e1), string(e2)) it.AddContact(string(e1)) clock.Run(rate) diff --git a/p2p/server.go b/p2p/server.go index 8e8cb2a004..74b72a5c6c 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -64,7 +64,11 @@ const ( frameWriteTimeout = 20 * time.Second ) -var errServerStopped = errors.New("server stopped") +var ( + errServerStopped = errors.New("server stopped") + errEncHandshakeError = errors.New("rlpx enc error") + errProtoHandshakeError = errors.New("rlpx proto error") +) // Config holds Server options. type Config struct { @@ -89,12 +93,14 @@ type Config struct { // Disabling is useful for protocol debugging (manual topology). NoDiscovery bool + // DiscoveryV4 specifies whether V4 discovery should be started. + DiscoveryV4 bool `toml:",omitempty"` + // DiscoveryV5 specifies whether the new topic-discovery based V5 discovery // protocol should be started or not. DiscoveryV5 bool `toml:",omitempty"` // Name sets the node name of this server. - // Use common.MakeName to create a name that follows existing conventions. Name string `toml:"-"` // BootstrapNodes are used to establish connectivity @@ -539,57 +545,28 @@ func (srv *Server) setupLocalNode() error { func (srv *Server) setupDiscovery() error { srv.discmix = enode.NewFairMix(discmixTimeout) - // Add protocol-specific discovery sources. - added := make(map[string]bool) - for _, proto := range srv.Protocols { - if proto.DialCandidates != nil && !added[proto.Name] { - srv.discmix.AddSource(proto.DialCandidates) - added[proto.Name] = true - } - } - // Don't listen on UDP endpoint if DHT is disabled. - if srv.NoDiscovery && !srv.DiscoveryV5 { + if srv.NoDiscovery { return nil } - - listenAddr := srv.ListenAddr - - // Use an alternate listening address for UDP if - // a custom discovery address is configured. - if srv.DiscAddr != "" { - listenAddr = srv.DiscAddr - } - - addr, err := net.ResolveUDPAddr("udp", listenAddr) - if err != nil { - return err - } - conn, err := net.ListenUDP("udp", addr) + conn, err := srv.setupUDPListening() if err != nil { return err } - realaddr := conn.LocalAddr().(*net.UDPAddr) - srv.log.Debug("UDP listener up", "addr", realaddr) - if srv.NAT != nil { - if !realaddr.IP.IsLoopback() { - srv.loopWG.Add(1) - go func() { - nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "cortex discovery") - srv.loopWG.Done() - }() - } + + var ( + sconn discover.UDPConn = conn + unhandled chan discover.ReadPacket + ) + // If both versions of discovery are running, setup a shared + // connection, so v5 can read unhandled messages from v4. + if srv.DiscoveryV4 && srv.DiscoveryV5 { + unhandled = make(chan discover.ReadPacket, 100) + sconn = &sharedUDPConn{conn, unhandled} } - srv.localnode.SetFallbackUDP(realaddr.Port) - // Discovery V4 - var unhandled chan discover.ReadPacket - var sconn *sharedUDPConn - if !srv.NoDiscovery { - if srv.DiscoveryV5 { - unhandled = make(chan discover.ReadPacket, 100) - sconn = &sharedUDPConn{conn, unhandled} - } + // Start discovery services. + if srv.DiscoveryV4 { cfg := discover.Config{ PrivateKey: srv.PrivateKey, NetRestrict: srv.NetRestrict, @@ -604,8 +581,6 @@ func (srv *Server) setupDiscovery() error { srv.ntab = ntab srv.discmix.AddSource(ntab.RandomNodes()) } - - // Discovery V5 if srv.DiscoveryV5 { cfg := discover.Config{ PrivateKey: srv.PrivateKey, @@ -613,16 +588,20 @@ func (srv *Server) setupDiscovery() error { Bootnodes: srv.BootstrapNodesV5, Log: srv.log, } - var err error - if sconn != nil { - srv.DiscV5, err = discover.ListenV5(sconn, srv.localnode, cfg) - } else { - srv.DiscV5, err = discover.ListenV5(conn, srv.localnode, cfg) - } + srv.DiscV5, err = discover.ListenV5(sconn, srv.localnode, cfg) if err != nil { return err } } + + // Add protocol-specific discovery sources. + added := make(map[string]bool) + for _, proto := range srv.Protocols { + if proto.DialCandidates != nil && !added[proto.Name] { + srv.discmix.AddSource(proto.DialCandidates) + added[proto.Name] = true + } + } return nil } @@ -693,6 +672,37 @@ func (srv *Server) setupListening() error { return nil } +func (srv *Server) setupUDPListening() (*net.UDPConn, error) { + listenAddr := srv.ListenAddr + + // Use an alternate listening address for UDP if + // a custom discovery address is configured. + if srv.DiscAddr != "" { + listenAddr = srv.DiscAddr + } + addr, err := net.ResolveUDPAddr("udp", listenAddr) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + realaddr := conn.LocalAddr().(*net.UDPAddr) + srv.log.Debug("UDP listener up", "addr", realaddr) + if srv.NAT != nil { + if !realaddr.IP.IsLoopback() { + srv.loopWG.Add(1) + go func() { + nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "cortex discovery") + srv.loopWG.Done() + }() + } + } + srv.localnode.SetFallbackUDP(realaddr.Port) + return conn, nil +} + // doPeerOp runs fn on the main loop. func (srv *Server) doPeerOp(fn peerOpFunc) { select { @@ -773,7 +783,11 @@ running: srv.dialsched.peerAdded(c) if p.Inbound() { inboundCount++ + serveSuccessMeter.Mark(1) + } else { + dialSuccessMeter.Mark(1) } + activePeerGauge.Inc(1) } c.cont <- err @@ -786,6 +800,7 @@ running: if pd.Inbound() { inboundCount-- } + activePeerGauge.Dec(1) } } @@ -895,11 +910,8 @@ func (srv *Server) listenLoop() { continue } if remoteIP != nil { - var addr *net.TCPAddr - if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok { - addr = tcp - } - fd = newMeteredConn(fd, true, addr) + fd = newMeteredConn(fd) + serveMeter.Mark(1) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) } go func() { @@ -940,6 +952,9 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) err := srv.setupConn(c, flags, dialDest) if err != nil { + if !c.is(inboundConn) { + markDialError(err) + } c.close(err) } return err @@ -958,7 +973,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro if dialDest != nil { dialPubkey := new(ecdsa.PublicKey) if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil { - err = errors.New("dial destination doesn't have a secp256k1 public key") + err = fmt.Errorf("%w: dial destination doesn't have a secp256k1 public key", errEncHandshakeError) srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } @@ -968,7 +983,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) - return err + return fmt.Errorf("%w: %v", errEncHandshakeError, err) } if dialDest != nil { c.node = dialDest @@ -986,7 +1001,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { clog.Trace("Failed p2p handshake", "err", err) - return err + return fmt.Errorf("%w: %v", errProtoHandshakeError, err) } if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) { clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID)) @@ -1057,7 +1072,7 @@ func (srv *Server) runPeer(p *Peer) { // Broadcast peer drop to external subscribers. This needs to be // after the send to delpeer so subscribers have a consistent view of // the peer set (i.e. Server.Peers() doesn't include the peer when the - // event is received. + // event is received). srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeDrop, Peer: p.ID(), diff --git a/p2p/server_test.go b/p2p/server_test.go index e1332a1fb6..efd3e6b0a9 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -370,8 +370,6 @@ func TestServerSetupConn(t *testing.T) { clientkey, srvkey = newkey(), newkey() clientpub = &clientkey.PublicKey srvpub = &srvkey.PublicKey - fooErr = errors.New("foo") - readErr = errors.New("read error") ) tests := []struct { dontstart bool @@ -389,10 +387,10 @@ func TestServerSetupConn(t *testing.T) { wantCloseErr: errServerStopped, }, { - tt: &setupTransport{pubkey: clientpub, encHandshakeErr: readErr}, + tt: &setupTransport{pubkey: clientpub, encHandshakeErr: errEncHandshakeError}, flags: inboundConn, wantCalls: "doEncHandshake,close,", - wantCloseErr: readErr, + wantCloseErr: errEncHandshakeError, }, { tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}}, @@ -402,11 +400,11 @@ func TestServerSetupConn(t *testing.T) { wantCloseErr: DiscUnexpectedIdentity, }, { - tt: &setupTransport{pubkey: clientpub, protoHandshakeErr: fooErr}, + tt: &setupTransport{pubkey: clientpub, protoHandshakeErr: errProtoHandshakeError}, dialDest: enode.NewV4(clientpub, nil, 0, 0), flags: dynDialedConn, wantCalls: "doEncHandshake,doProtoHandshake,close,", - wantCloseErr: fooErr, + wantCloseErr: errProtoHandshakeError, }, { tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{ID: crypto.FromECDSAPub(srvpub)[1:]}},