Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix beacon sync channel blocking issue #1586

Merged
merged 2 commits into from Sep 13, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -32,6 +32,7 @@ const (
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
BatchSize uint32 = 1000 //maximum size for one query of block hashes
SyncLoopFrequency = 1 // unit in second
LastMileBlocksSize = 10
)

// SyncPeerConfig is peer config to sync.
@@ -105,6 +106,7 @@ type StateSync struct {
syncConfig *SyncConfig
stateSyncTaskQueue *queue.Queue
syncMux sync.Mutex
lastMileMux sync.Mutex
}

func (ss *StateSync) purgeAllBlocksFromCache() {
@@ -130,9 +132,13 @@ func (ss *StateSync) purgeOldBlocksFromCache() {
}

// AddLastMileBlock add the lastest a few block into queue for syncing
// only keep the latest blocks with size capped by LastMileBlocksSize
func (ss *StateSync) AddLastMileBlock(block *types.Block) {
ss.syncMux.Lock()
defer ss.syncMux.Unlock()
ss.lastMileMux.Lock()
defer ss.lastMileMux.Unlock()
if len(ss.lastMileBlocks) >= LastMileBlocksSize {
ss.lastMileBlocks = ss.lastMileBlocks[1:]
}
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
This conversation was marked as resolved by chaosma

This comment has been minimized.

Copy link
@LeoHChen

LeoHChen Sep 12, 2019

Contributor

where did you clear the lastMileBlocks? Is there a better cache package you can use?

This conversation was marked as resolved by chaosma

This comment has been minimized.

Copy link
@harmony-ek

harmony-ek Sep 12, 2019

Contributor

Broadcast/gossip messages may be reordered. Please keep the list sorted after appending. Otherwise the node may end up dropping newer blocks.

}

@@ -23,10 +23,11 @@ import (

// Constants related to doing syncing.
const (
lastMileThreshold = 4
inSyncThreshold = 1 // unit in number of block
SyncFrequency = 10 // unit in second
MinConnectedPeers = 10 // minimum number of peers connected to in node syncing
lastMileThreshold = 4
inSyncThreshold = 1 // unit in number of block
SyncFrequency = 10 // unit in second
BeaconSyncFrequency = 5 // unit in second
MinConnectedPeers = 10 // minimum number of peers connected to in node syncing
)

// getNeighborPeers is a helper function to return list of peers
@@ -160,84 +161,86 @@ func (p *LocalSyncingPeerProvider) SyncingPeers(shardID uint32) (peers []p2p.Pee

// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks
func (node *Node) DoBeaconSyncing() {
go func(node *Node) {
for {
select {
case beaconBlock := <-node.BeaconBlockChannel:
This conversation was marked as resolved by chaosma

This comment has been minimized.

Copy link
@LeoHChen

LeoHChen Sep 12, 2019

Contributor

this could block, you need to have some timeout mechanism.

node.beaconSync.AddLastMileBlock(beaconBlock)
}
}
}(node)

for {
select {
case beaconBlock := <-node.BeaconBlockChannel:
if node.beaconSync == nil {
utils.Logger().Info().Msg("initializing beacon sync")
node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
if node.beaconSync == nil {
utils.Logger().Info().Msg("initializing beacon sync")
node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
}
if node.beaconSync.GetActivePeerNumber() == 0 {
utils.Logger().Info().Msg("no peers; bootstrapping beacon sync config")
peers, err := node.SyncingPeerProvider.SyncingPeers(0)
if err != nil {
utils.Logger().Warn().
Err(err).
Msg("cannot retrieve beacon syncing peers")
continue
}
if node.beaconSync.GetActivePeerNumber() == 0 {
utils.Logger().Info().Msg("no peers; bootstrapping beacon sync config")
peers, err := node.SyncingPeerProvider.SyncingPeers(0)
if err != nil {
utils.Logger().Warn().
Err(err).
Msg("cannot retrieve beacon syncing peers")
continue
}
if err := node.beaconSync.CreateSyncConfig(peers, true); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot create beacon sync config")
continue
}
if err := node.beaconSync.CreateSyncConfig(peers, true); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot create beacon sync config")
continue
}
node.beaconSync.AddLastMileBlock(beaconBlock)
node.beaconSync.SyncLoop(node.Beaconchain(), node.BeaconWorker, false, true)
}
node.beaconSync.SyncLoop(node.Beaconchain(), node.BeaconWorker, false, true)
time.Sleep(BeaconSyncFrequency * time.Second)
}
}

// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
ticker := time.NewTicker(SyncFrequency * time.Second)

SyncingLoop:
for {
select {
case <-ticker.C:
if node.stateSync == nil {
node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())

utils.Logger().Debug().Msg("[SYNC] initialized state sync")
if node.stateSync == nil {
node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
utils.Logger().Debug().Msg("[SYNC] initialized state sync")
}
if node.stateSync.GetActivePeerNumber() < MinConnectedPeers {
shardID := bc.ShardID()
peers, err := node.SyncingPeerProvider.SyncingPeers(shardID)
if err != nil {
utils.Logger().Warn().
Err(err).
Uint32("shard_id", shardID).
Msg("cannot retrieve syncing peers")
continue SyncingLoop
}
if err := node.stateSync.CreateSyncConfig(peers, false); err != nil {
utils.Logger().Warn().
Err(err).
Interface("peers", peers).
Msg("[SYNC] create peers error")
continue SyncingLoop
}
if node.stateSync.GetActivePeerNumber() < MinConnectedPeers {
shardID := bc.ShardID()
peers, err := node.SyncingPeerProvider.SyncingPeers(shardID)
if err != nil {
utils.Logger().Warn().
Err(err).
Uint32("shard_id", shardID).
Msg("cannot retrieve syncing peers")
continue SyncingLoop
}
if err := node.stateSync.CreateSyncConfig(peers, false); err != nil {
utils.Logger().Warn().
Err(err).
Interface("peers", peers).
Msg("[SYNC] create peers error")
continue SyncingLoop
}
utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers")
utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers")
}
if node.stateSync.IsOutOfSync(bc) {
node.stateMutex.Lock()
node.State = NodeNotInSync
node.stateMutex.Unlock()
if willJoinConsensus {
node.Consensus.BlocksNotSynchronized()
}
if node.stateSync.IsOutOfSync(bc) {
node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false)
if willJoinConsensus {
node.stateMutex.Lock()
node.State = NodeNotInSync
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
if willJoinConsensus {
node.Consensus.BlocksNotSynchronized()
}
node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false)
if willJoinConsensus {
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
node.Consensus.BlocksSynchronized()
}
node.Consensus.BlocksSynchronized()
}
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
}
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
time.Sleep(SyncFrequency * time.Second)
}
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.