Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): fix dht connection on discovery on devnet #2059

Merged
merged 18 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/gssmr/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ nobootstrap = false
nomdns = false
discovery-interval = 10
min-peers = 1
max-peers = 50

[rpc]
enabled = false
Expand Down
2 changes: 2 additions & 0 deletions chain/gssmr/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ var (
DefaultNoMDNS = false
// DefaultMinPeers is the default minimum desired peer count
DefaultMinPeers = 1
// DefaultMaxPeers is the default maximum desired peer count
DefaultMaxPeers = 50

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Second * 10
Expand Down
4 changes: 3 additions & 1 deletion cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func createDotConfig(ctx *cli.Context) (*dot.Config, error) {
return nil, err
}

logger.Infof("loaded package log configuration: %v", cfg.Log)
// TODO: log this better.
// See https://github.com/ChainSafe/gossamer/issues/1945
logger.Infof("loaded package log configuration: %#v", cfg.Log)

// set global configuration values
if err := setDotGlobalConfig(ctx, tomlCfg, &cfg.Global); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions cmd/gossamer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -472,6 +473,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -486,6 +488,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -500,6 +503,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -514,6 +518,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: true,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -528,6 +533,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: false,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
PublicIP: "10.0.5.2",
},
},
Expand Down Expand Up @@ -909,6 +915,7 @@ func TestUpdateConfigFromGenesisData(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
System: testCfg.System,
Expand Down
3 changes: 3 additions & 0 deletions cmd/gossamer/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestExportCommand(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
Pprof: testCfg.Pprof,
Expand Down Expand Up @@ -112,6 +113,7 @@ func TestExportCommand(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
Pprof: testCfg.Pprof,
Expand Down Expand Up @@ -149,6 +151,7 @@ func TestExportCommand(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
Pprof: testCfg.Pprof,
Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func GssmrConfig() *Config {
NoMDNS: gssmr.DefaultNoMDNS,
DiscoveryInterval: gssmr.DefaultDiscoveryInterval,
MinPeers: gssmr.DefaultMinPeers,
MaxPeers: gssmr.DefaultMaxPeers,
},
RPC: RPCConfig{
Port: gssmr.DefaultRPCHTTPPort,
Expand Down
11 changes: 8 additions & 3 deletions dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ func TestMinPeers(t *testing.T) {
}

nodeB := createTestService(t, configB)
require.Equal(t, min, nodeB.host.peerCount())
require.GreaterOrEqual(t, nodeB.host.peerCount(), len(nodes))

nodeB.host.cm.peerSetHandler.DisconnectPeer(0, nodes[0].host.id())
require.GreaterOrEqual(t, min, nodeB.host.peerCount())
// check that peer count is at least greater than minimum number of peers,
// even after trying to disconnect from all peers
for _, node := range nodes {
nodeB.host.cm.peerSetHandler.DisconnectPeer(0, node.host.id())
}

require.GreaterOrEqual(t, nodeB.host.peerCount(), min)
}

func TestMaxPeers(t *testing.T) {
Expand Down
14 changes: 1 addition & 13 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (d *discovery) advertise() {

ttl, err = d.rd.Advertise(d.ctx, string(d.pid))
if err != nil {
logger.Debugf("failed to advertise in the DHT: %s", err)
logger.Warnf("failed to advertise in the DHT: %s", err)
ttl = tryAdvertiseTimeout
}
case <-d.ctx.Done():
Expand Down Expand Up @@ -199,21 +199,9 @@ func (d *discovery) findPeers(ctx context.Context) {

logger.Tracef("found new peer %s via DHT", peer.ID)

// TODO: this isn't working on the devnet (#2026)
// can remove the code block below which directly connects
// once that's fixed
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
d.handler.AddPeer(0, peer.ID)

// found a peer, try to connect if we need more peers
if len(d.h.Network().Peers()) >= d.maxPeers {
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
return
}

if err = d.h.Connect(d.ctx, peer); err != nil {
logger.Tracef("failed to connect to discovered peer %s: %s", peer.ID, err)
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

// We have tried to set maxInPeers and maxOutPeers such that number of peer
// connections remain between min peers and max peers
const reservedOnly = false
peerCfgSet := peerset.NewConfigSet(
uint32(cfg.MaxPeers-cfg.MinPeers),
uint32(cfg.MinPeers),
uint32(cfg.MaxPeers/2),
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
reservedOnly,
peerSetSlotAllocTime)
peerSetSlotAllocTime,
)

// create connection manager
cm, err := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,21 +680,21 @@ func (s *Service) processMessage(msg peerset.Message) {
var err error
addrInfo, err = s.host.discovery.findPeer(peerID)
if err != nil {
logger.Debugf("failed to find peer id %s: %s", peerID, err)
logger.Warnf("failed to find peer id %s: %s", peerID, err)
return
}
}

err := s.host.connect(addrInfo)
if err != nil {
logger.Debugf("failed to open connection for peer %s: %s", peerID, err)
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
return
}
logger.Debugf("connection successful with peer %s", peerID)
case peerset.Drop, peerset.Reject:
err := s.host.closePeer(peerID)
if err != nil {
logger.Debugf("failed to close connection with peer %s: %s", peerID, err)
logger.Warnf("failed to close connection with peer %s: %s", peerID, err)
return
}
logger.Debugf("connection dropped successfully for peer %s", peerID)
Expand Down
59 changes: 48 additions & 11 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,33 @@ const (
disconnect
)

func (a ActionReceiver) String() string {
switch a {
case addReservedPeer:
return "addReservedPeer"
case removeReservedPeer:
return "removeReservedPeer"
case setReservedPeers:
return "setReservedPeers"
case setReservedOnly:
return "setReservedOnly"
case reportPeer:
return "reportPeer"
case addToPeerSet:
return "addToPeerSet"
case removeFromPeerSet:
return "removeFromPeerSet"
case incoming:
return "incoming"
case sortedPeers:
return "sortedPeers"
case disconnect:
return "disconnect"
default:
return "invalid action"
}
}

// action struct stores the action type and required parameters to perform action
type action struct {
actionCall ActionReceiver
Expand All @@ -67,8 +94,8 @@ func (a action) String() string {
for i := range a.peers {
peersStrings[i] = a.peers[i].String()
}
return fmt.Sprintf("{call=%d, set-id=%d, reputation change %v, peers=[%s]",
a.actionCall, a.setID, a.reputation, strings.Join(peersStrings, ", "))
return fmt.Sprintf("{call=%s, set-id=%d, reputation change %v, peers=[%s]",
a.actionCall.String(), a.setID, a.reputation, strings.Join(peersStrings, ", "))
}

// Status represents the enum value for Message
Expand Down Expand Up @@ -156,9 +183,9 @@ type PeerSet struct {
// config is configuration of a single set.
type config struct {
// maximum number of slot occupying nodes for incoming connections.
inPeers uint32
maxInPeers uint32
// maximum number of slot occupying nodes for outgoing connections.
outPeers uint32
maxOutPeers uint32

// TODO Use in future for reserved only peers
// if true, we only accept reservedNodes (#1888).
Expand All @@ -174,10 +201,10 @@ type ConfigSet struct {
}

// NewConfigSet creates a new config set for the peerSet
func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *ConfigSet {
func NewConfigSet(maxInPeers, maxOutPeers uint32, reservedOnly bool, allocTime time.Duration) *ConfigSet {
set := &config{
inPeers: in,
outPeers: out,
maxInPeers: maxInPeers,
maxOutPeers: maxOutPeers,
reservedOnly: reservedOnly,
periodicAllocTime: allocTime,
}
Expand Down Expand Up @@ -351,6 +378,8 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if n.getReputation() < BannedThresholdValue {
logger.Warnf("reputation is lower than banned threshold value, reputation: %d, banned threshold value: %d",
n.getReputation(), BannedThresholdValue)
break
}

Expand All @@ -364,6 +393,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
PeerID: reservePeer,
}
}

// nothing more to do if we're in reserved mode.
if ps.isReservedOnly {
return nil
Expand All @@ -382,6 +412,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if err = peerState.tryOutgoing(setIdx, peerID); err != nil {
logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.Pretty(), err)
break
}

Expand All @@ -403,10 +434,14 @@ func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error {
return nil
}

ps.peerState.discover(setID, peerID)

ps.reservedNode[peerID] = struct{}{}
ps.peerState.addNoSlotNode(setID, peerID)
if err := ps.peerState.addNoSlotNode(setID, peerID); err != nil {
return fmt.Errorf("could not add to list of no-slot nodes: %w", err)
}
if err := ps.allocSlots(setID); err != nil {
return err
return fmt.Errorf("could not allocate slots: %w", err)
}
}
return nil
Expand All @@ -420,7 +455,9 @@ func (ps *PeerSet) removeReservedPeers(setID int, peers ...peer.ID) error {
}

delete(ps.reservedNode, peerID)
ps.peerState.removeNoSlotNode(setID, peerID)
if err := ps.peerState.removeNoSlotNode(setID, peerID); err != nil {
return fmt.Errorf("could not remove from the list of no-slot nodes: %w", err)
}

// nothing more to do if not in reservedOnly mode.
if !ps.isReservedOnly {
Expand Down Expand Up @@ -645,7 +682,7 @@ func (ps *PeerSet) doWork() {
l := ps.peerState.getSetLength()
for i := 0; i < l; i++ {
if err := ps.allocSlots(i); err != nil {
logger.Debugf("failed to do action on peerSet: %s", err)
logger.Warnf("failed to do action on peerSet: %s", err)
}
}
case act, ok := <-ps.actionQueue:
Expand Down
Loading