Skip to content

Commit

Permalink
FIx test failures.
Browse files Browse the repository at this point in the history
  • Loading branch information
arijitAD committed Sep 28, 2021
1 parent 8acf420 commit 8c7f3e5
Show file tree
Hide file tree
Showing 29 changed files with 1,163 additions and 594 deletions.
3 changes: 3 additions & 0 deletions dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"math/big"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage"
"github.com/ChainSafe/gossamer/lib/transaction"
"github.com/libp2p/go-libp2p-core/peer"
)

// BlockState interface for block state methods
Expand Down Expand Up @@ -75,6 +77,7 @@ type TransactionState interface {
// Network is the interface for the network service
type Network interface {
GossipMessage(network.NotificationsMessage)
ReportPeer(p peer.ID, change peerset.ReputationChange)
}

// EpochState is the interface for state.EpochState
Expand Down
13 changes: 12 additions & 1 deletion dot/core/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package core

import (
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/transaction"
"github.com/libp2p/go-libp2p-core/peer"
)

// HandleTransactionMessage validates each transaction in the message and
// adds valid transactions to the transaction queue of the BABE session
// returns boolean for transaction propagation, true - transactions should be propagated
func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (bool, error) {
func (s *Service) HandleTransactionMessage(peerID peer.ID, msg *network.TransactionMessage) (bool, error) {
logger.Debug("received TransactionMessage")

// get transactions from message extrinsics
Expand All @@ -49,6 +52,10 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (boo
externalExt := types.Extrinsic(append([]byte{byte(types.TxnExternal)}, tx...))
val, err := rt.ValidateTransaction(externalExt)
if err != nil {
s.net.ReportPeer(peerID, peerset.ReputationChange{
Value: state.BadTransactionValue,
Reason: state.BadTransactionReason,
})
logger.Debug("failed to validate transaction", "err", err)
continue
}
Expand All @@ -58,6 +65,10 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (boo

// push to the transaction queue of BABE session
hash := s.transactionState.AddToPool(vtx)
s.net.ReportPeer(peerID, peerset.ReputationChange{
Value: state.GoodTransactionValue,
Reason: state.GoodTransactionReason,
})
logger.Trace("Added transaction to queue", "hash", hash)

// find tx(s) that should propagate
Expand Down
7 changes: 4 additions & 3 deletions dot/core/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/pkg/scale"

"github.com/centrifuge/go-substrate-rpc-client/v3/signature"
ctypes "github.com/centrifuge/go-substrate-rpc-client/v3/types"

Expand Down Expand Up @@ -124,6 +123,8 @@ func TestService_ProcessBlockAnnounceMessage(t *testing.T) {
}

func TestService_HandleTransactionMessage(t *testing.T) {
const peer1 = "testPeer1"

kp, err := sr25519.GenerateKeypair()
require.NoError(t, err)

Expand Down Expand Up @@ -158,7 +159,7 @@ func TestService_HandleTransactionMessage(t *testing.T) {

extBytes := createExtrinsic(t, rt, genHash, 0)
msg := &network.TransactionMessage{Extrinsics: []types.Extrinsic{extBytes}}
b, err := s.HandleTransactionMessage(msg)
b, err := s.HandleTransactionMessage(peer1, msg)
require.NoError(t, err)
require.True(t, b)

Expand All @@ -168,7 +169,7 @@ func TestService_HandleTransactionMessage(t *testing.T) {

extBytes = []byte(`bogus extrinsic`)
msg = &network.TransactionMessage{Extrinsics: []types.Extrinsic{extBytes}}
b, err = s.HandleTransactionMessage(msg)
b, err = s.HandleTransactionMessage(peer1, msg)
require.NoError(t, err)
require.False(t, b)
}
6 changes: 6 additions & 0 deletions dot/core/mocks/network.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 12 additions & 51 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import (
"crypto/rand"
"math/big"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

ma "github.com/multiformats/go-multiaddr"
)

var (
maxRetries = 12
"github.com/ChainSafe/gossamer/dot/peerset"
)

// ConnManager implements connmgr.ConnManager
Expand All @@ -51,15 +47,23 @@ type ConnManager struct {

// persistentPeers contains peers we should remain connected to.
persistentPeers *sync.Map // map[peer.ID]struct{}

peerSetHandler PeerSetHandler
}

func newConnManager(min, max int) *ConnManager {
func newConnManager(min, max int, peerSetCfg *peerset.ConfigSet) *ConnManager {
psh, err := peerset.NewPeerSetHandler(peerSetCfg)
if err != nil {
return nil
}

return &ConnManager{
min: min,
max: max,
closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)),
protectedPeers: new(sync.Map),
persistentPeers: new(sync.Map),
peerSetHandler: psh,
}
}

Expand Down Expand Up @@ -191,51 +195,8 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) {
cm.disconnectHandler(c.RemotePeer())
}

if !cm.isPersistent(c.RemotePeer()) {
return
}

addrs := cm.host.h.Peerstore().Addrs(c.RemotePeer())
info := peer.AddrInfo{
ID: c.RemotePeer(),
Addrs: addrs,
}

count := 0
retry := func() bool {
err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
return false
}

count++
if count > maxRetries {
return true
}
return true
}

go func() {
if retry() {
return
}

retryTimer := time.NewTicker(time.Minute)
defer retryTimer.Stop()
for {
select {
case <-cm.host.ctx.Done():
return
case <-retryTimer.C:
if retry() {
return
}
}
}
}()

// TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers
// TODO Analyse if this is at correct place along with reason and find usage of RefusedDrop.
_ = cm.peerSetHandler.Dropped(0, c.RemotePeer(), peerset.UnknownDrop)
}

func (cm *ConnManager) registerDisconnectHandler(cb func(peer.ID)) {
Expand Down
5 changes: 4 additions & 1 deletion dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/lib/utils"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,7 +64,9 @@ func TestMaxPeers(t *testing.T) {
}

func TestProtectUnprotectPeer(t *testing.T) {
cm := newConnManager(1, 4)
min, max := 1, 4
peerCfgSet := peerset.NewConfigSet(uint32(max-min), uint32(max), nil, nil, false)
cm := newConnManager(min, max, peerCfgSet)

p1 := peer.ID("a")
p2 := peer.ID("b")
Expand Down
16 changes: 5 additions & 11 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type discovery struct {
ds *badger.Datastore
pid protocol.ID
minPeers, maxPeers int
handler PeerSetHandler
}

func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min, max int) *discovery {
func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min int, max int, handler PeerSetHandler) *discovery {
return &discovery{
ctx: ctx,
h: h,
Expand All @@ -66,6 +67,7 @@ func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrI
pid: pid,
minPeers: min,
maxPeers: max,
handler: handler,
}
}

Expand Down Expand Up @@ -208,16 +210,8 @@ func (d *discovery) findPeers(ctx context.Context) {

logger.Trace("found new peer via DHT", "peer", peer.ID)

// found a peer, try to connect if we need more peers
if len(d.h.Network().Peers()) < d.maxPeers {
err = d.h.Connect(d.ctx, peer)
if err != nil {
logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err)
}
} else {
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
return
}
d.handler.AddToPeerSet(0, peer.ID)
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
}
}
}
43 changes: 24 additions & 19 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/chyeh/pubip"
"github.com/dgraph-io/ristretto"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p"
Expand All @@ -37,7 +38,7 @@ import (
secio "github.com/libp2p/go-libp2p-secio"
ma "github.com/multiformats/go-multiaddr"

"github.com/chyeh/pubip"
"github.com/ChainSafe/gossamer/dot/peerset"
)

var privateCIDRs = []string{
Expand Down Expand Up @@ -86,21 +87,32 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
}
}

// create connection manager
cm := newConnManager(cfg.MinPeers, cfg.MaxPeers)

// format bootnodes
bns, err := stringsToAddrInfos(cfg.Bootnodes)
if err != nil {
return nil, err
}

bNodePeers := make([]peer.ID, len(bns))
for idx, n := range bns {
bNodePeers[idx] = n.ID
}

// format persistent peers
pps, err := stringsToAddrInfos(cfg.PersistentPeers)
if err != nil {
return nil, err
}

rNodePeers := make([]peer.ID, len(pps))
for idx, n := range pps {
rNodePeers[idx] = n.ID
}

peerCfgSet := peerset.NewConfigSet(uint32(cfg.MaxPeers-cfg.MinPeers), uint32(cfg.MinPeers), bNodePeers, rNodePeers, false)
// create connection manager
cm := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet)

for _, pp := range pps {
cm.persistentPeers.Store(pp.ID, struct{}{})
}
Expand Down Expand Up @@ -173,7 +185,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
}

bwc := metrics.NewBandwidthCounter()
discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers)
discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers, cm.peerSetHandler)

host := &host{
ctx: ctx,
Expand Down Expand Up @@ -250,18 +262,10 @@ func (h *host) connect(p peer.AddrInfo) (err error) {

// bootstrap connects the host to the configured bootnodes
func (h *host) bootstrap() {
failed := 0
all := append(h.bootnodes, h.persistentPeers...)
for _, addrInfo := range all {
logger.Debug("bootstrapping to peer", "peer", addrInfo.ID)
err := h.connect(addrInfo)
if err != nil {
logger.Debug("failed to bootstrap to peer", "error", err)
failed++
}
}
if failed == len(all) && len(all) != 0 {
logger.Error("failed to bootstrap to any bootnode")
h.cm.peerSetHandler.AddToPeerSet(0, addrInfo.ID)
}
}

Expand Down Expand Up @@ -366,18 +370,19 @@ func (h *host) peers() []peer.ID {
// addReservedPeers adds the peers `addrs` to the protected peers list and connects to them
func (h *host) addReservedPeers(addrs ...string) error {
for _, addr := range addrs {
maddr, err := ma.NewMultiaddr(addr)
mAddr, err := ma.NewMultiaddr(addr)
if err != nil {
return err
}

addinfo, err := peer.AddrInfoFromP2pAddr(maddr)
addrInfo, err := peer.AddrInfoFromP2pAddr(mAddr)
if err != nil {
return err
}
h.cm.peerSetHandler.AddReservedPeer(0, addrInfo.ID)

h.h.ConnManager().Protect(addinfo.ID, "")
if err := h.connect(*addinfo); err != nil {
h.h.ConnManager().Protect(addrInfo.ID, "")
if err := h.connect(*addrInfo); err != nil {
return err
}
}
Expand All @@ -392,7 +397,7 @@ func (h *host) removeReservedPeers(ids ...string) error {
if err != nil {
return err
}

h.cm.peerSetHandler.RemoveReservedPeer(0, peerID)
h.h.ConnManager().Unprotect(peerID, "")
}

Expand Down
Loading

0 comments on commit 8c7f3e5

Please sign in to comment.