Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions ctxc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ type ProtocolManager struct {

chainSync *chainSyncer
wg sync.WaitGroup
peerWG sync.WaitGroup
handlerStartCh chan struct{}
handlerDoneCh chan struct{}
broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation
}

Expand All @@ -147,16 +148,18 @@ type ProtocolManager struct {
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ctxcdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
forkFilter: forkid.NewFilter(blockchain),
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chaindb: chaindb,
peers: newPeerSet(),
whitelist: whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: networkID,
forkFilter: forkid.NewFilter(blockchain),
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chaindb: chaindb,
peers: newPeerSet(),
whitelist: whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
}
// Figure out whether to allow fast sync or not
//if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() == 0 {
Expand Down Expand Up @@ -253,6 +256,42 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
return manager, nil
}

// protoTracker tracks the number of active protocol handlers.
func (h *ProtocolManager) protoTracker() {
defer h.wg.Done()
var active int
for {
select {
case <-h.handlerStartCh:
active++
case <-h.handlerDoneCh:
active--
case <-h.quitSync:
// Wait for all active handlers to finish.
for ; active > 0; active-- {
<-h.handlerDoneCh
}
return
}
}
}

// incHandlers signals to increment the number of active handlers if not
// quitting.
func (h *ProtocolManager) incHandlers() bool {
select {
case h.handlerStartCh <- struct{}{}:
return true
case <-h.quitSync:
return false
}
}

// decHandlers signals to decrement the number of active handlers.
func (h *ProtocolManager) decHandlers() {
h.handlerDoneCh <- struct{}{}
}

func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
length, ok := protocolLengths[version]
if !ok {
Expand Down Expand Up @@ -325,6 +364,10 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm.wg.Add(2)
go pm.chainSync.loop()
go pm.txsyncLoop64()

// start peer handler tracker
pm.wg.Add(1)
go pm.protoTracker()
}

func (pm *ProtocolManager) Stop() {
Expand All @@ -335,15 +378,14 @@ func (pm *ProtocolManager) Stop() {
// After this is done, no new peers will be accepted.
close(pm.quitSync)
log.Warn("Cortex peers stopping ... ...")
pm.wg.Wait()

// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet
// will exit when they try to register.
pm.peers.Close()
log.Info("Cortex protocol stopping ... ...")
pm.peerWG.Wait()
pm.wg.Wait()

log.Info("Cortex protocol stopped")
}
Expand All @@ -353,11 +395,11 @@ func (pm *ProtocolManager) newPeer(pv uint, p *p2p.Peer, rw p2p.MsgReadWriter, g
}

func (pm *ProtocolManager) runPeer(p *peer) error {
if !pm.chainSync.handlePeerEvent(p) {
if !pm.incHandlers() {
return p2p.DiscQuitting
}
pm.peerWG.Add(1)
defer pm.peerWG.Done()
defer pm.decHandlers()

return pm.handle(p)
}

Expand Down Expand Up @@ -398,7 +440,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Error("Failed to register peer in syncer", "err", err)
return err
}
pm.chainSync.handlePeerEvent(p)
pm.chainSync.handlePeerEvent()

// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
Expand Down Expand Up @@ -807,7 +849,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Update the peers total difficulty if better than the previous
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
p.SetHead(trueHead, trueTD)
pm.chainSync.handlePeerEvent(p)
pm.chainSync.handlePeerEvent()
}

case msg.Code == ctxc.NewPooledTransactionHashesMsg && p.version >= ctxc65:
Expand Down
2 changes: 1 addition & 1 deletion ctxc/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func newChainSyncer(pm *ProtocolManager) *chainSyncer {
// handlePeerEvent notifies the syncer about a change in the peer set.
// This is called for new peers and every time a peer announces a new
// chain head.
func (cs *chainSyncer) handlePeerEvent(p *peer) bool {
func (cs *chainSyncer) handlePeerEvent() bool {
select {
case cs.peerEventCh <- struct{}{}:
return true
Expand Down