Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): fix dht connection on discovery on devnet #2059

Merged
merged 18 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chain/gssmr/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ var (
DefaultNoMDNS = false
// DefaultMinPeers is the default minimum desired peer count
DefaultMinPeers = 1
// DefaultMaxPeers is the default maximum desired peer count
DefaultMaxPeers = 50

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Second * 10
Expand Down
4 changes: 3 additions & 1 deletion cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func createDotConfig(ctx *cli.Context) (*dot.Config, error) {
return nil, err
}

logger.Infof("loaded package log configuration: %v", cfg.Log)
// TODO: log this better.
// See https://github.com/ChainSafe/gossamer/issues/1945
logger.Infof("loaded package log configuration: %#v", cfg.Log)

// set global configuration values
if err := setDotGlobalConfig(ctx, tomlCfg, &cfg.Global); err != nil {
Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func GssmrConfig() *Config {
NoMDNS: gssmr.DefaultNoMDNS,
DiscoveryInterval: gssmr.DefaultDiscoveryInterval,
MinPeers: gssmr.DefaultMinPeers,
MaxPeers: gssmr.DefaultMaxPeers,
},
RPC: RPCConfig{
Port: gssmr.DefaultRPCHTTPPort,
Expand Down
15 changes: 2 additions & 13 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (d *discovery) advertise() {

ttl, err = d.rd.Advertise(d.ctx, string(d.pid))
if err != nil {
logger.Debugf("failed to advertise in the DHT: %s", err)
// TODO: This fails consistently.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
logger.Warnf("failed to advertise in the DHT: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can probably remove the error log or change it do debug, as when you start the first node and it has no peers, this will always error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will always error

For some time until another node is up right? Maybe we can have it as info with something less dramatic than failed such as

Suggested change
// TODO: This fails consistently.
logger.Warnf("failed to advertise in the DHT: %s", err)
logger.Infof("cannot advertise in the DHT: %s", err)

Copy link
Contributor Author

@kishansagathiya kishansagathiya Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can probably remove the error log or change it do debug, as when you start the first node and it has no peers, this will always error

I just tried a dev network with 3 nodes. I saw it only in the beginning.

I will remove my comment This fails consistently.

We advertise in the DHT at regular intervals, so we shouldn't remove this log. During the first time it says that there are no peers in the table (so anyone looking at the logs, would know to avoid it). This warning does not show after that.

I would rather have this warning in case this fails for any other reason, than have that error hidden in the debug mode.

@noot should i still change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to leave it as-is for now

ttl = tryAdvertiseTimeout
}
case <-d.ctx.Done():
Expand Down Expand Up @@ -199,21 +200,9 @@ func (d *discovery) findPeers(ctx context.Context) {

logger.Tracef("found new peer %s via DHT", peer.ID)

// TODO: this isn't working on the devnet (#2026)
// can remove the code block below which directly connects
// once that's fixed
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
d.handler.AddPeer(0, peer.ID)

// found a peer, try to connect if we need more peers
if len(d.h.Network().Peers()) >= d.maxPeers {
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
return
}

if err = d.h.Connect(d.ctx, peer); err != nil {
logger.Tracef("failed to connect to discovered peer %s: %s", peer.ID, err)
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

// TODO: What should be the right value of max in and max out?
const reservedOnly = false
peerCfgSet := peerset.NewConfigSet(
uint32(cfg.MaxPeers-cfg.MinPeers),
uint32(cfg.MinPeers),
uint32(cfg.MaxPeers/2),
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
reservedOnly,
peerSetSlotAllocTime)
peerSetSlotAllocTime,
)

// create connection manager
cm, err := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,21 +676,21 @@ func (s *Service) processMessage(msg peerset.Message) {
var err error
addrInfo, err = s.host.discovery.findPeer(peerID)
if err != nil {
logger.Debugf("failed to find peer id %s: %s", peerID, err)
logger.Warnf("failed to find peer id %s: %s", peerID, err)
return
}
}

err := s.host.connect(addrInfo)
if err != nil {
logger.Debugf("failed to open connection for peer %s: %s", peerID, err)
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
return
}
logger.Debugf("connection successful with peer %s", peerID)
case peerset.Drop, peerset.Reject:
err := s.host.closePeer(peerID)
if err != nil {
logger.Debugf("failed to close connection with peer %s: %s", peerID, err)
logger.Warnf("failed to close connection with peer %s: %s", peerID, err)
return
}
logger.Debugf("connection dropped successfully for peer %s", peerID)
Expand Down
4 changes: 3 additions & 1 deletion dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package peerset

import "github.com/libp2p/go-libp2p-core/peer"
import (
"github.com/libp2p/go-libp2p-core/peer"
)

// Handler manages peerSet.
type Handler struct {
Expand Down
25 changes: 17 additions & 8 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ type PeerSet struct {
// config is configuration of a single set.
type config struct {
// maximum number of slot occupying nodes for incoming connections.
inPeers uint32
maxInPeers uint32
// maximum number of slot occupying nodes for outgoing connections.
outPeers uint32
maxOutPeers uint32

// TODO Use in future for reserved only peers
// if true, we only accept reservedNodes (#1888).
Expand All @@ -174,10 +174,10 @@ type ConfigSet struct {
}

// NewConfigSet creates a new config set for the peerSet
func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *ConfigSet {
func NewConfigSet(maxInPeers, maxOutPeers uint32, reservedOnly bool, allocTime time.Duration) *ConfigSet {
set := &config{
inPeers: in,
outPeers: out,
maxInPeers: maxInPeers,
maxOutPeers: maxOutPeers,
reservedOnly: reservedOnly,
periodicAllocTime: allocTime,
}
Expand Down Expand Up @@ -349,6 +349,8 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if n.getReputation() < BannedThresholdValue {
logger.Warnf("reputation is lower than banned threshold value, reputation: %d, banned threshold value: %d",
n.getReputation(), BannedThresholdValue)
break
}

Expand All @@ -362,12 +364,14 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
PeerID: reservePeer,
}
}

// nothing more to do if we're in reserved mode.
if ps.isReservedOnly {
return nil
}

for peerState.hasFreeOutgoingSlot(setIdx) {

kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
peerID := peerState.highestNotConnectedPeer(setIdx)
if peerID == "" {
break
Expand All @@ -380,6 +384,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if err = peerState.tryOutgoing(setIdx, peerID); err != nil {
logger.Errorf("could not set peer as outgoing connection, peer: %s, error: %s", peerID.Pretty(), err)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
break
}

Expand All @@ -402,7 +407,9 @@ func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error {
}

ps.reservedNode[peerID] = struct{}{}
ps.peerState.addNoSlotNode(setID, peerID)
if err := ps.peerState.addNoSlotNode(setID, peerID); err != nil {
return err
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}
if err := ps.allocSlots(setID); err != nil {
return err
}
Expand All @@ -418,7 +425,9 @@ func (ps *PeerSet) removeReservedPeers(setID int, peers ...peer.ID) error {
}

delete(ps.reservedNode, peerID)
ps.peerState.removeNoSlotNode(setID, peerID)
if err := ps.peerState.removeNoSlotNode(setID, peerID); err != nil {
return err
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

// nothing more to do if not in reservedOnly mode.
if !ps.isReservedOnly {
Expand Down Expand Up @@ -624,7 +633,7 @@ func (ps *PeerSet) doWork() {
l := ps.peerState.getSetLength()
for i := 0; i < l; i++ {
if err := ps.allocSlots(i); err != nil {
logger.Debugf("failed to do action on peerSet: %s", err)
logger.Warnf("failed to do action on peerSet: %s", err)
}
}
case act, ok := <-ps.actionQueue:
Expand Down
39 changes: 20 additions & 19 deletions dot/peerset/peerstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package peerset

import (
"fmt"
"math"
"sort"
"time"
Expand Down Expand Up @@ -39,7 +40,7 @@ type Info struct {
// number of slot occupying nodes for which the MembershipState is ingoing.
numIn uint32

// number of slot occupying nodes for which the MembershipState is ingoing.
// number of slot occupying nodes for which the MembershipState is outgoing.
numOut uint32

// maximum allowed number of slot occupying nodes for which the MembershipState is ingoing.
Expand All @@ -57,8 +58,8 @@ type Info struct {

// node represents state of a single node that we know about
type node struct {
// list of Set the node belongs to.
// always has a fixed size equal to the one of PeersState Set. The various possible Set
// list of Set, the node belongs to.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
// always has a fixed size, equal to the one of PeersState Set. The various possible Set
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
// are indices into this Set.
state []MembershipState

Expand Down Expand Up @@ -127,8 +128,8 @@ func NewPeerState(cfgs []*config) (*PeersState, error) {
info := Info{
numIn: 0,
numOut: 0,
maxIn: cfg.inPeers,
maxOut: cfg.outPeers,
maxIn: cfg.maxInPeers,
maxOut: cfg.maxOutPeers,
noSlotNodes: make(map[peer.ID]struct{}),
}

Expand Down Expand Up @@ -237,17 +238,17 @@ func (ps *PeersState) hasFreeIncomingSlot(set int) bool {

// addNoSlotNode adds a node to the list of nodes that don't occupy slots.
// has no effect if the node was already in the group.
func (ps *PeersState) addNoSlotNode(idx int, peerID peer.ID) {
func (ps *PeersState) addNoSlotNode(idx int, peerID peer.ID) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a condition if idx >= len(ps.sets) then return an error? it could not happen but just to adopt a kind of defensive programming.

if _, ok := ps.sets[idx].noSlotNodes[peerID]; ok {
logger.Debugf("peer %s already exists in no slot node", peerID)
return
return nil
}

// Insert peerStatus
ps.sets[idx].noSlotNodes[peerID] = struct{}{}
n, err := ps.getNode(peerID)
if err != nil {
return
return fmt.Errorf("could not get node: %w", err)
}

switch n.state[idx] {
Expand All @@ -258,17 +259,19 @@ func (ps *PeersState) addNoSlotNode(idx int, peerID peer.ID) {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question: numIn and numOut can achieve negative values? @arijitAD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numIn: number of connections where other nodes connected to us
numOut: number of connections where we connected to other nodes

don't think these values could be negative

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, should we add some sort of check to avoid these values being negatives?


ps.nodes[peerID] = n
return nil
}

func (ps *PeersState) removeNoSlotNode(idx int, peerID peer.ID) {
func (ps *PeersState) removeNoSlotNode(idx int, peerID peer.ID) error {
if _, ok := ps.sets[idx].noSlotNodes[peerID]; !ok {
return
logger.Debugf("peer %s is not in \"no slot node\" map", peerID)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

delete(ps.sets[idx].noSlotNodes, peerID)
n, err := ps.getNode(peerID)
if err != nil {
return
return fmt.Errorf("could not get node: %w", err)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

switch n.state[idx] {
Expand All @@ -277,6 +280,7 @@ func (ps *PeersState) removeNoSlotNode(idx int, peerID peer.ID) {
case outgoing:
ps.sets[idx].numOut++
}
return nil
}

// disconnect updates the node status to the notConnected state.
Expand Down Expand Up @@ -356,16 +360,13 @@ func (ps *PeersState) forgetPeer(set int, peerID peer.ID) error {
}

// tryOutgoing tries to set the peer as connected as an outgoing connection.
// If there are enough slots available, switches the node to Connected and returns nil error. If
// the slots are full, the node stays "not connected" and we return error.
// If there are enough slots available, switches the node to Connected and returns
// nil error. If the slots are full, the node stays "not connected" and we return error.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
// non slot occupying nodes don't count towards the number of slots.
func (ps *PeersState) tryOutgoing(setID int, peerID peer.ID) error {
var isNoSlotOccupied bool
if _, ok := ps.sets[setID].noSlotNodes[peerID]; ok {
isNoSlotOccupied = true
}
_, isNoSlotNode := ps.sets[setID].noSlotNodes[peerID]

if !ps.hasFreeOutgoingSlot(setID) && !isNoSlotOccupied {
if !ps.hasFreeOutgoingSlot(setID) && !isNoSlotNode {
return ErrOutgoingSlotsUnavailable
}

Expand All @@ -375,7 +376,7 @@ func (ps *PeersState) tryOutgoing(setID int, peerID peer.ID) error {
}

n.state[setID] = outgoing
if !isNoSlotOccupied {
if !isNoSlotNode {
ps.sets[setID].numOut++
}

Expand Down
16 changes: 10 additions & 6 deletions dot/peerset/peerstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ func TestNoSlotNodeDoesntOccupySlot(t *testing.T) {
state := newTestPeerState(t, 1, 1)

// peer1 will not occupy any slot.
state.addNoSlotNode(0, peer1)
err := state.addNoSlotNode(0, peer1)
require.NoError(t, err)

// initially peer1 state will be unknownPeer.
require.Equal(t, unknownPeer, state.peerStatus(0, peer1))
// discover peer1
state.discover(0, peer1)
// peer1 will become an incoming connection.
err := state.tryAcceptIncoming(0, peer1)
err = state.tryAcceptIncoming(0, peer1)
require.NoError(t, err)
// peer1 is connected
require.Equal(t, connectedPeer, state.peerStatus(0, peer1))
Expand Down Expand Up @@ -116,12 +118,13 @@ func TestDisconnectNoSlotDoesntPanic(t *testing.T) {

state := newTestPeerState(t, 1, 1)

state.addNoSlotNode(0, peer1)
err := state.addNoSlotNode(0, peer1)
require.NoError(t, err)

require.Equal(t, unknownPeer, state.peerStatus(0, peer1))

state.discover(0, peer1)
err := state.tryOutgoing(0, peer1)
err = state.tryOutgoing(0, peer1)
require.NoError(t, err)

require.Equal(t, connectedPeer, state.peerStatus(0, peer1))
Expand Down Expand Up @@ -198,10 +201,11 @@ func TestSortedPeers(t *testing.T) {
const msgChanSize = 1
state := newTestPeerState(t, 2, 1)

state.addNoSlotNode(0, peer1)
err := state.addNoSlotNode(0, peer1)
require.NoError(t, err)

state.discover(0, peer1)
err := state.tryAcceptIncoming(0, peer1)
err = state.tryAcceptIncoming(0, peer1)
require.NoError(t, err)

require.Equal(t, connectedPeer, state.peerStatus(0, peer1))
Expand Down
8 changes: 4 additions & 4 deletions dot/peerset/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func newTestPeerSet(t *testing.T, in, out uint32, bootNodes, reservedPeers []pee
con := &ConfigSet{
Set: []*config{
{
inPeers: in,
outPeers: out,
maxInPeers: in,
maxOutPeers: out,
reservedOnly: reservedOnly,
periodicAllocTime: time.Second * 2,
},
Expand All @@ -53,8 +53,8 @@ func newTestPeerState(t *testing.T, maxIn, maxOut uint32) *PeersState {
t.Helper()
state, err := NewPeerState([]*config{
{
inPeers: maxIn,
outPeers: maxOut,
maxInPeers: maxIn,
maxOutPeers: maxOut,
},
})
require.NoError(t, err)
Expand Down