Skip to content

Commit

Permalink
feature(dot/sync): implement *tipSyncer.hasCurrentWorker, add minPe…
Browse files Browse the repository at this point in the history
…ers, slotDuration to `chainSync`, check if syncing in `*core.Service.HandleTransactionMessage` (#1881)
  • Loading branch information
noot committed Oct 13, 2021
1 parent a095d95 commit bf903f2
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 44 deletions.
1 change: 1 addition & 0 deletions dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type TransactionState interface {
// Network is the interface for the network service
type Network interface {
GossipMessage(network.NotificationsMessage)
IsSynced() bool
}

// EpochState is the interface for state.EpochState
Expand Down
5 changes: 5 additions & 0 deletions dot/core/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (
func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (bool, error) {
logger.Debug("received TransactionMessage")

if !s.net.IsSynced() {
logger.Debug("ignoring TransactionMessage, not yet synced")
return false, nil
}

// get transactions from message extrinsics
txs := msg.Extrinsics
var toPropagate []types.Extrinsic
Expand Down
16 changes: 15 additions & 1 deletion dot/core/mocks/network.go → dot/core/mocks/Network.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 4 additions & 32 deletions dot/core/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,10 @@ func NewTestService(t *testing.T, cfg *Config) *Service {
cfg.BlockState.StoreRuntime(cfg.BlockState.BestBlockHash(), cfg.Runtime)

if cfg.Network == nil {
config := &network.Config{
BasePath: testDatadirPath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
BlockState: stateSrvc.Block,
TransactionHandler: network.NewMockTransactionHandler(),
}
cfg.Network = createTestNetworkService(t, config)
net := new(coremocks.MockNetwork)
net.On("GossipMessage", mock.AnythingOfType("*network.TransactionMessage"))
net.On("IsSynced").Return(true)
cfg.Network = net
}

if cfg.CodeSubstitutes == nil {
Expand Down Expand Up @@ -160,26 +155,3 @@ func NewTestService(t *testing.T, cfg *Config) *Service {

return s
}

// helper method to create and start a new network service
func createTestNetworkService(t *testing.T, cfg *network.Config) (srvc *network.Service) {
if cfg.LogLvl == 0 {
cfg.LogLvl = 3
}

if cfg.Syncer == nil {
cfg.Syncer = network.NewMockSyncer()
}

srvc, err := network.NewService(cfg)
require.NoError(t, err)

err = srvc.Start()
require.NoError(t, err)

t.Cleanup(func() {
err := srvc.Stop()
require.NoError(t, err)
})
return srvc
}
5 changes: 5 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,3 +738,8 @@ func (*Service) StartingBlock() int64 {
// TODO: refactor this to get the data from the sync service
return 0
}

// IsSynced returns whether we are synced (no longer in bootstrap mode) or not
func (s *Service) IsSynced() bool {
return s.syncer.IsSynced()
}
7 changes: 7 additions & 0 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ func createBlockVerifier(st *state.Service) (*babe.VerificationManager, error) {
}

func newSyncService(cfg *Config, st *state.Service, fg sync.FinalityGadget, verifier *babe.VerificationManager, cs *core.Service, net *network.Service) (*sync.Service, error) {
slotDuration, err := st.Epoch.GetSlotDuration()
if err != nil {
return nil, err
}

syncCfg := &sync.Config{
LogLvl: cfg.Log.SyncLvl,
Network: net,
Expand All @@ -416,6 +421,8 @@ func newSyncService(cfg *Config, st *state.Service, fg sync.FinalityGadget, veri
FinalityGadget: fg,
BabeVerifier: verifier,
BlockImportHandler: cs,
MinPeers: cfg.Network.MinPeers,
SlotDuration: slotDuration,
}

return sync.NewService(syncCfg)
Expand Down
15 changes: 9 additions & 6 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,12 @@ type chainSync struct {
benchmarker *syncBenchmarker

finalisedCh <-chan *types.FinalisationInfo

minPeers int
slotDuration time.Duration
}

func newChainSync(bs BlockState, net Network, readyBlocks *blockQueue, pendingBlocks DisjointBlockSet) *chainSync {
func newChainSync(bs BlockState, net Network, readyBlocks *blockQueue, pendingBlocks DisjointBlockSet, minPeers int, slotDuration time.Duration) *chainSync {
ctx, cancel := context.WithCancel(context.Background())
return &chainSync{
ctx: ctx,
Expand All @@ -175,17 +178,18 @@ func newChainSync(bs BlockState, net Network, readyBlocks *blockQueue, pendingBl
handler: newBootstrapSyncer(bs),
benchmarker: newSyncBenchmarker(),
finalisedCh: bs.GetFinalisedNotifierChannel(),
minPeers: minPeers,
slotDuration: slotDuration,
}
}

func (cs *chainSync) start() {
// wait until we have received 1+ peer heads
// TODO: this should be based off our min/max peers
// wait until we have received at least `minPeers` peer heads
for {
cs.RLock()
n := len(cs.peerState)
cs.RUnlock()
if n >= 1 {
if n >= cs.minPeers {
break
}
time.Sleep(time.Millisecond * 100)
Expand Down Expand Up @@ -369,8 +373,7 @@ func (cs *chainSync) ignorePeer(who peer.ID) {

func (cs *chainSync) sync() {
// set to slot time
// TODO: make configurable
ticker := time.NewTicker(time.Second * 6)
ticker := time.NewTicker(cs.slotDuration)

for {
select {
Expand Down
8 changes: 6 additions & 2 deletions dot/sync/chain_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ import (
"github.com/stretchr/testify/require"
)

var testTimeout = time.Second * 5
const (
defaultMinPeers = 1
testTimeout = time.Second * 5
defaultSlotDuration = time.Second * 6
)

func newTestChainSync(t *testing.T) (*chainSync, *blockQueue) {
header, err := types.NewHeader(common.NewHash([]byte{0}), trie.EmptyHash, trie.EmptyHash, big.NewInt(0), types.NewDigest())
Expand All @@ -51,7 +55,7 @@ func newTestChainSync(t *testing.T) (*chainSync, *blockQueue) {
net.On("DoBlockRequest", mock.AnythingOfType("peer.ID"), mock.AnythingOfType("*network.BlockRequestMessage")).Return(nil, nil)

readyBlocks := newBlockQueue(maxResponseSize)
cs := newChainSync(bs, net, readyBlocks, newDisjointBlockSet(pendingBlocksLimit))
cs := newChainSync(bs, net, readyBlocks, newDisjointBlockSet(pendingBlocksLimit), defaultMinPeers, defaultSlotDuration)
return cs, readyBlocks
}

Expand Down
5 changes: 4 additions & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sync
import (
"math/big"
"os"
"time"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/types"
Expand Down Expand Up @@ -46,6 +47,8 @@ type Config struct {
TransactionState TransactionState
BlockImportHandler BlockImportHandler
BabeVerifier BabeVerifier
MinPeers int
SlotDuration time.Duration
}

// NewService returns a new *sync.Service
Expand Down Expand Up @@ -84,7 +87,7 @@ func NewService(cfg *Config) (*Service, error) {

readyBlocks := newBlockQueue(maxResponseSize * 30)
pendingBlocks := newDisjointBlockSet(pendingBlocksLimit)
chainSync := newChainSync(cfg.BlockState, cfg.Network, readyBlocks, pendingBlocks)
chainSync := newChainSync(cfg.BlockState, cfg.Network, readyBlocks, pendingBlocks, cfg.MinPeers, cfg.SlotDuration)
chainProcessor := newChainProcessor(readyBlocks, pendingBlocks, cfg.BlockState, cfg.StorageState, cfg.TransactionState, cfg.BabeVerifier, cfg.FinalityGadget, cfg.BlockImportHandler)

return &Service{
Expand Down
45 changes: 43 additions & 2 deletions dot/sync/tip_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,49 @@ func (s *tipSyncer) handleWorkerResult(res *worker) (*worker, error) {
}, nil
}

func (*tipSyncer) hasCurrentWorker(_ *worker, _ map[uint64]*worker) bool {
// TODO
func (*tipSyncer) hasCurrentWorker(w *worker, workers map[uint64]*worker) bool {
if w == nil || w.startNumber == nil || w.targetNumber == nil {
return true
}

for _, curr := range workers {
if w.direction != curr.direction || w.requestData != curr.requestData {
continue
}

targetDiff := w.targetNumber.Cmp(curr.targetNumber)
startDiff := w.startNumber.Cmp(curr.startNumber)

switch w.direction {
case network.Ascending:
// worker target is greater than existing worker's target
if targetDiff > 0 {
continue
}

// worker start is less than existing worker's start
if startDiff < 0 {
continue
}
case network.Descending:
// worker target is less than existing worker's target
if targetDiff < 0 {
continue
}

// worker start is greater than existing worker's start
if startDiff > 0 {
continue
}
}

// worker (start, end) is within curr (start, end), if hashes are equal then the request is either
// for the same data or some subset of data that is covered by curr
if w.startHash.Equal(curr.startHash) || w.targetHash.Equal(curr.targetHash) {
return true
}
}

return false
}

Expand Down
65 changes: 65 additions & 0 deletions dot/sync/tip_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,68 @@ func TestTipSyncer_handleTick_case3(t *testing.T) {
s.readyBlocks.pop() // first pop will remove parent
require.Equal(t, block.ToBlockData(), s.readyBlocks.pop())
}

func TestTipSyncer_hasCurrentWorker(t *testing.T) {
s := newTestTipSyncer(t)
require.False(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(0),
targetNumber: big.NewInt(0),
}, nil))

workers := make(map[uint64]*worker)
workers[0] = &worker{
startNumber: big.NewInt(1),
targetNumber: big.NewInt(128),
}
require.False(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(1),
targetNumber: big.NewInt(129),
}, workers))
require.True(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(1),
targetNumber: big.NewInt(128),
}, workers))
require.True(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(1),
targetNumber: big.NewInt(127),
}, workers))

workers[0] = &worker{
startNumber: big.NewInt(128),
targetNumber: big.NewInt(255),
}
require.False(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(127),
targetNumber: big.NewInt(255),
}, workers))
require.True(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(128),
targetNumber: big.NewInt(255),
}, workers))

workers[0] = &worker{
startNumber: big.NewInt(128),
targetNumber: big.NewInt(1),
direction: network.Descending,
}
require.False(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(129),
targetNumber: big.NewInt(1),
direction: network.Descending,
}, workers))
require.True(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(128),
targetNumber: big.NewInt(1),
direction: network.Descending,
}, workers))
require.True(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(128),
targetNumber: big.NewInt(2),
direction: network.Descending,
}, workers))
require.True(t, s.hasCurrentWorker(&worker{
startNumber: big.NewInt(127),
targetNumber: big.NewInt(1),
direction: network.Descending,
}, workers))
}

0 comments on commit bf903f2

Please sign in to comment.