Skip to content

Commit

Permalink
fix: fix non deterministic panic during TestStableNetworkRPC integrat…
Browse files Browse the repository at this point in the history
…ion test (#3756)
  • Loading branch information
P1sar committed Feb 23, 2024
1 parent 200ca4b commit ee3d243
Show file tree
Hide file tree
Showing 19 changed files with 104 additions and 378 deletions.
9 changes: 5 additions & 4 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type ConnManager struct {
sync.Mutex
host *host
min, max int
maxPeers int
connectHandler func(peer.ID)
disconnectHandler func(peer.ID)

Expand All @@ -33,15 +33,16 @@ type ConnManager struct {
peerSetHandler PeerSetHandler
}

func newConnManager(min, max int, peerSetCfg *peerset.ConfigSet) (*ConnManager, error) {
func newConnManager(max int, peerSetCfg *peerset.ConfigSet) (*ConnManager, error) {
// TODO: peerSetHandler never used from within connection manager and also referred outside through cm,
// so this should be refactored
psh, err := peerset.NewPeerSetHandler(peerSetCfg)
if err != nil {
return nil, err
}

return &ConnManager{
min: min,
max: max,
maxPeers: max,
protectedPeers: new(sync.Map),
persistentPeers: new(sync.Map),
peerSetHandler: psh,
Expand Down
2 changes: 1 addition & 1 deletion dot/network/connmgr_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestProtectUnprotectPeer(t *testing.T) {
)

peerCfgSet := peerset.NewConfigSet(uint32(max-min), uint32(max), false, slotAllocationTime)
cm, err := newConnManager(min, max, peerCfgSet)
cm, err := newConnManager(max, peerCfgSet)
require.NoError(t, err)

p1 := peer.ID("a")
Expand Down
28 changes: 15 additions & 13 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,35 @@ var (

// discovery handles discovery of new peers via the kademlia DHT
type discovery struct {
ctx context.Context
dht *dual.DHT
rd *routing.RoutingDiscovery
h libp2phost.Host
bootnodes []peer.AddrInfo
ds *badger.Datastore
pid protocol.ID
minPeers, maxPeers int
handler PeerSetHandler
ctx context.Context
dht *dual.DHT
rd *routing.RoutingDiscovery
h libp2phost.Host
bootnodes []peer.AddrInfo
ds *badger.Datastore
pid protocol.ID
maxPeers int
handler PeerSetHandler
}

func newDiscovery(ctx context.Context, h libp2phost.Host,
bootnodes []peer.AddrInfo, ds *badger.Datastore,
pid protocol.ID, min, max int, handler PeerSetHandler) *discovery {
pid protocol.ID, max int, handler PeerSetHandler) *discovery {
return &discovery{
ctx: ctx,
h: h,
bootnodes: bootnodes,
ds: ds,
pid: pid,
minPeers: min,
maxPeers: max,
handler: handler,
}
}

// waitForPeers periodically checks kadDHT peers store for new peers and returns them,
// this function used for local environments to prepopulate bootnodes from mDNS
func (d *discovery) waitForPeers() (peers []peer.AddrInfo, err error) {
// get all currently connected peers and use them to bootstrap the DHT

currentPeers := d.h.Network().Peers()

t := time.NewTicker(startDHTTimeout)
Expand Down Expand Up @@ -92,6 +92,9 @@ func (d *discovery) waitForPeers() (peers []peer.AddrInfo, err error) {

// start creates the DHT.
func (d *discovery) start() error {
// this basically only works with enabled mDNS which is used only for local test setups. Without bootnodes kademilia
// would not bee able to connect to any peers and mDNS is used to find peers in local network.
// TODO: should be refactored because this if is basically used for local integration test purpose
if len(d.bootnodes) == 0 {
peers, err := d.waitForPeers()
if err != nil {
Expand All @@ -100,7 +103,6 @@ func (d *discovery) start() error {

d.bootnodes = peers
}

logger.Debugf("starting DHT with bootnodes %v...", d.bootnodes)
logger.Debugf("V1ProtocolOverride %v...", d.pid+"/kad")

Expand Down
9 changes: 7 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,19 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
// connections remain between min peers and max peers
const reservedOnly = false
peerCfgSet := peerset.NewConfigSet(
//TODO: there is no any understanding of maxOutPeers and maxInPirs calculations.
// This needs to be explicitly mentioned

// maxInPeers is later used in peerstate only and defines available Incoming connection slots
uint32(cfg.MaxPeers-cfg.MinPeers),
// maxOutPeers is later used in peerstate only and defines available Outgoing connection slots
uint32(cfg.MaxPeers/2),
reservedOnly,
peerSetSlotAllocTime,
)

// create connection manager
cm, err := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet)
cm, err := newConnManager(cfg.MaxPeers, peerCfgSet)
if err != nil {
return nil, fmt.Errorf("failed to create connection manager: %w", err)
}
Expand Down Expand Up @@ -243,7 +248,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
}

bwc := metrics.NewBandwidthCounter()
discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers, cm.peerSetHandler)
discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MaxPeers, cm.peerSetHandler)

host := &host{
ctx: ctx,
Expand Down
7 changes: 5 additions & 2 deletions dot/network/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package network

import "encoding/json"
import (
"encoding/json"
"io"
)

// Telemetry is the telemetry client to send telemetry messages.
type Telemetry interface {
Expand All @@ -22,5 +25,5 @@ type Logger interface {
// MDNS is the mDNS service interface.
type MDNS interface {
Start() error
Stop() error
io.Closer
}
4 changes: 2 additions & 2 deletions internal/mdns/notifee.go → dot/network/notifee.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package mdns
package network

import (
"time"
Expand Down Expand Up @@ -35,7 +35,7 @@ type NotifeeTracker struct {
peerAdder PeerAdder
}

// HandlePeerFound tracks the address info from the peer found.
// HandlePeerFound is a libp2p.mdns.Notifee interface implementation for mDNS in libp2p.
func (n *NotifeeTracker) HandlePeerFound(p peer.AddrInfo) {
n.addressAdder.AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
n.peerAdder.AddPeer(0, p.ID)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func Test_HandshakeTimeout(t *testing.T) {
// after the timeout
time.Sleep(handshakeTimeout)

// handshake data shouldn't exist still
// handshake data still shouldn't exist
data = info.peersData.getOutboundHandshakeData(nodeB.host.id())
require.Nil(t, data)

Expand Down
18 changes: 13 additions & 5 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/internal/mdns"
"github.com/ChainSafe/gossamer/internal/metrics"
"github.com/ChainSafe/gossamer/lib/common"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand Down Expand Up @@ -194,12 +194,12 @@ func NewService(cfg *Config) (*Service, error) {
}

serviceTag := string(host.protocolID)
notifee := mdns.NewNotifeeTracker(host.p2pHost.Peerstore(), host.cm.peerSetHandler)
notifee := NewNotifeeTracker(host.p2pHost.Peerstore(), host.cm.peerSetHandler)
mdnsLogger := log.NewFromGlobal(log.AddContext("module", "mdns"))
mdnsLogger.Debugf(
"Creating mDNS discovery service with host %s and protocol %s...",
host.id(), host.protocolID)
mdnsService := mdns.NewService(host.p2pHost, serviceTag, mdnsLogger, notifee)
mdnsService := mdns.NewMdnsService(host.p2pHost, serviceTag, notifee)

network := &Service{
ctx: ctx,
Expand Down Expand Up @@ -291,6 +291,8 @@ func (s *Service) Start() error {

// this handles all new connections (incoming and outgoing)
// it creates a per-protocol mutex for sending outbound handshakes to the peer
// connectHandler is a part of libp2p.Notifiee interface implementation and getting called in the very end
//after or Incoming or Outgoing node is connected
s.host.cm.connectHandler = func(peerID peer.ID) {
for _, prtl := range s.notificationsProtocols {
prtl.peersData.setMutex(peerID)
Expand Down Expand Up @@ -322,7 +324,8 @@ func (s *Service) Start() error {
return fmt.Errorf("starting mDNS service: %w", err)
}
}

// TODO: this is basically a hack that is used only in unit tests to disable kademilia dht.
// Should be replaced with a mock instead.
if !s.noDiscover {
go func() {
err = s.host.discovery.start()
Expand Down Expand Up @@ -467,7 +470,7 @@ func (s *Service) Stop() error {
s.cancel()

// close mDNS discovery service
err := s.mdns.Stop()
err := s.mdns.Close()
if err != nil {
logger.Errorf("Failed to close mDNS discovery service: %s", err)
}
Expand Down Expand Up @@ -694,6 +697,9 @@ func (s *Service) startPeerSetHandler() {
go s.startProcessingMsg()
}

// processMessage process messages from PeerSetHandler. Responsible for Connecting and Drop connection with peers.
// When Connect message received function looking for a PeerAddr in Peerstore.
// If address is not found in peerstore we are looking for a peer with DHT
func (s *Service) processMessage(msg peerset.Message) {
peerID := msg.PeerID
if peerID == "" {
Expand All @@ -714,6 +720,7 @@ func (s *Service) processMessage(msg peerset.Message) {

err := s.host.connect(addrInfo)
if err != nil {
// TODO: if error happens here outgoing (?) slot is occupied but no peer is really connected
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
return
}
Expand All @@ -728,6 +735,7 @@ func (s *Service) processMessage(msg peerset.Message) {
}
}

// startProcessingMsg function that listens to messages from the channel that belongs to PeerSet PeerSetHandler.
func (s *Service) startProcessingMsg() {
msgCh := s.host.cm.peerSetHandler.Messages()
for {
Expand Down
4 changes: 2 additions & 2 deletions dot/network/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestStringToAddrInfo(t *testing.T) {
for _, str := range TestPeers {
pi, err := stringToAddrInfo(str)
require.NoError(t, err)
require.Equal(t, pi.ID.Pretty(), str[len(str)-46:])
require.Equal(t, pi.ID.String(), str[len(str)-46:])
}
}

Expand All @@ -74,7 +74,7 @@ func TestStringsToAddrInfos(t *testing.T) {
require.NoError(t, err)

for k, pi := range pi {
require.Equal(t, pi.ID.Pretty(), TestPeers[k][len(TestPeers[k])-46:])
require.Equal(t, pi.ID.String(), TestPeers[k][len(TestPeers[k])-46:])
}
}

Expand Down
18 changes: 12 additions & 6 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ type PeerSet struct {
// TODO: this will be useful for reserved only mode
// this is for future purpose if reserved-only flag is enabled (#1888).
isReservedOnly bool
resultMsgCh chan Message

// resultMsgCh is read by network.Service.
resultMsgCh chan Message
// time when the PeerSet was created.
created time.Time
// last time when we updated the reputations of connected nodes.
Expand Down Expand Up @@ -369,6 +371,7 @@ func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error {
}

// allocSlots tries to fill available outgoing slots of nodes for the given set.
// By default this getting called every X seconds according to nextPeriodicAllocSlots ticker
func (ps *PeerSet) allocSlots(setIdx int) error {
err := ps.updateTime()
if err != nil {
Expand All @@ -382,7 +385,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
case connectedPeer:
continue
case unknownPeer:
peerState.discover(setIdx, reservePeer)
peerState.insertPeer(setIdx, reservePeer)
}

node, err := ps.peerState.getNode(reservePeer)
Expand Down Expand Up @@ -425,7 +428,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if err = peerState.tryOutgoing(setIdx, peerID); err != nil {
logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.Pretty(), err)
logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.String(), err)
break
}

Expand All @@ -450,7 +453,7 @@ func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error {
return nil
}

ps.peerState.discover(setID, peerID)
ps.peerState.insertPeer(setID, peerID)

ps.reservedNode[peerID] = struct{}{}
if err := ps.peerState.addNoSlotNode(setID, peerID); err != nil {
Expand Down Expand Up @@ -537,13 +540,16 @@ func (ps *PeerSet) setReservedPeer(setID int, peers ...peer.ID) error {
return nil
}

// addPeer checks peer existence in peerSet and if it does not insert the peer in to peerstate with
// default reputation and notConnected status. Afterwards runs allocSlots that checks availability of outgoing slots
// and put notConnected peers in to them
func (ps *PeerSet) addPeer(setID int, peers peer.IDSlice) error {
for _, pid := range peers {
if ps.peerState.peerStatus(setID, pid) != unknownPeer {
return nil
}

ps.peerState.discover(setID, pid)
ps.peerState.insertPeer(setID, pid)
if err := ps.allocSlots(setID); err != nil {
return fmt.Errorf("could not allocate slots: %w", err)
}
Expand Down Expand Up @@ -612,7 +618,7 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
case notConnectedPeer:
ps.peerState.nodes[pid].lastConnected[setID] = time.Now()
case unknownPeer:
ps.peerState.discover(setID, pid)
ps.peerState.insertPeer(setID, pid)
}

state := ps.peerState
Expand Down
10 changes: 5 additions & 5 deletions dot/peerset/peerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBanRejectAcceptPeer(t *testing.T) {
peer1Status := ps.peerState.peerStatus(testSetID, peer1)
require.Equal(t, unknownPeer, peer1Status)

ps.peerState.discover(testSetID, peer1)
ps.peerState.insertPeer(testSetID, peer1)
// adding peer1 with incoming slot.
err := ps.peerState.tryAcceptIncoming(testSetID, peer1)
require.NoError(t, err)
Expand Down Expand Up @@ -136,19 +136,19 @@ func TestPeerSetIncoming(t *testing.T) {
pid: incomingPeer,
expectedStatus: Accept,
expectedNumIn: 1,
hasFreeIncomingSlot: false,
hasFreeIncomingSlot: true,
},
{
pid: incoming2,
expectedStatus: Accept,
expectedNumIn: 2,
hasFreeIncomingSlot: true,
hasFreeIncomingSlot: false, // since maxIn is 2, we will not have any free slots if 2 peers connected
},
{
pid: incoming3,
expectedStatus: Reject,
expectedNumIn: 2,
hasFreeIncomingSlot: true,
hasFreeIncomingSlot: false, // since maxIn is 2, we will not have any free slots if 2 peers connected
},
}

Expand Down Expand Up @@ -217,7 +217,7 @@ func TestReAllocAfterBanned(t *testing.T) {
peer1Status := ps.peerState.peerStatus(testSetID, peer1)
require.Equal(t, unknownPeer, peer1Status)

ps.peerState.discover(testSetID, peer1)
ps.peerState.insertPeer(testSetID, peer1)
err := ps.peerState.tryAcceptIncoming(testSetID, peer1)
require.NoError(t, err)

Expand Down

0 comments on commit ee3d243

Please sign in to comment.