Skip to content

Commit

Permalink
Merge 8bfe9d3 into 000fedd
Browse files Browse the repository at this point in the history
  • Loading branch information
peekpi committed Jun 4, 2019
2 parents 000fedd + 8bfe9d3 commit 4cc7618
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 32 deletions.
28 changes: 21 additions & 7 deletions blockchain/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Downloader struct {
subs []router.Subscription
}

// NewDownloader .
// NewDownloader create a new downloader
func NewDownloader(chain *BlockChain) *Downloader {
dl := &Downloader{
station: router.NewLocalStation("downloader", nil),
Expand All @@ -95,11 +95,16 @@ func NewDownloader(chain *BlockChain) *Downloader {
return dl
}

// Stop stop the downloader
func (dl *Downloader) Stop() {
close(dl.quit)
for _, sub := range dl.subs {
sub.Unsubscribe()
}
for _, v := range dl.remotes.data {
status := v.(*stationStatus)
close(status.errCh)
}
dl.loopWG.Wait()
}

Expand Down Expand Up @@ -403,7 +408,7 @@ func (dl *Downloader) multiplexDownload(status *stationStatus) bool {
}
ancestor, err := dl.findAncestor(stationSearch, status.station, headNumber, status.ancestor+1, status.errCh)
if err != nil {
log.Debug(fmt.Sprint("ancestor err", err))
log.Warn("ancestor err", "err", err)
return false
}
downloadStart := ancestor + 1
Expand Down Expand Up @@ -463,7 +468,12 @@ func (dl *Downloader) multiplexDownload(status *stationStatus) bool {
status.ancestor = n
if err != nil {
log.Warn("Insert error:", "number:", n, "error", err)
router.AddErr(status.station, uint64(numbers[len(numbers)-1]-n))
failedNum := numbers[len(numbers)-1] - n
router.AddErr(status.station, failedNum)
if failedNum > 32 {
log.Warn("Disconnect because Insert error:", "station:", fmt.Sprintf("%x", status.station.Name()), "failedNum", failedNum)
router.SendTo(nil, nil, router.OneMinuteLimited, status.station) // disconnect and put into blacklist
}
}

head = dl.blockchain.CurrentBlock()
Expand Down Expand Up @@ -510,7 +520,7 @@ func (dl *Downloader) loop() {
}

func (dl *Downloader) assignDownloadTask(hashes []common.Hash, numbers []uint64) (uint64, error) {
log.Debug(fmt.Sprint("assingDownloadTask:", len(hashes), len(numbers), numbers))
log.Debug("assingDownloadTask:", "hashesLen", len(hashes), "numbersLen", len(numbers), "numbers", numbers)
workers := &simpleHeap{cmp: dl.remotes.cmp}
dl.remotesMutex.RLock()
workers.data = append(workers.data, dl.remotes.data...)
Expand Down Expand Up @@ -598,14 +608,18 @@ type downloadTask struct {
}

func (task *downloadTask) Do() {
latestStatus := task.worker.getStatus()
defer func() {
task.errorTotal++
task.result <- task
if len(task.blocks) == 0 {
diff := latestStatus.Number - task.endNumber
if latestStatus.Number < task.endNumber {
diff = task.endNumber - latestStatus.Number
}
if len(task.blocks) == 0 && diff > 16 {
task.errorTotal++
router.AddErr(task.worker.station, 1)
}
}()
latestStatus := task.worker.getStatus()
if latestStatus.Number < task.endNumber {
return
}
Expand Down
6 changes: 3 additions & 3 deletions p2p/protoadaptor/ProtoAdaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ func (adaptor *ProtoAdaptor) adaptorLoop(peer *p2p.Peer, ws p2p.MsgReadWriter) e
}
router.AddNetIn(station, 1)
if checkDDOS(monitor, e) {
router.SendTo(nil, nil, router.OneMinuteLimited, e.From)
log.Warn("DDos defense", "peer", remote.peer.String(), "typecode", e.Typecode, "count", monitor[e.Typecode][1])
return p2p.DiscDDOS
//router.SendTo(nil, nil, router.OneMinuteLimited, e.From)
log.Warn("DDos defense -", "peer", remote.peer.String(), "typecode", e.Typecode, "count", monitor[e.Typecode][1])
//return p2p.DiscDDOS
}
router.SendEvent(e)
}
Expand Down
54 changes: 32 additions & 22 deletions txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
maxKonwnTxs = 32
txsSendDelay = 50 * time.Millisecond
txsSendThreshold = 32
cacheBits = 1
cacheBits = 12
cacheSize = 1 << cacheBits
cacheMask = cacheSize - 1
)
Expand Down Expand Up @@ -203,9 +203,9 @@ func NewTxpoolStation(txpool *TxPool) *TxpoolStation {
func (s *TxpoolStation) addTxs(txs []*TransactionWithPath, from string) []*types.Transaction {
rtxs := make([]*types.Transaction, 0, len(txs))
for _, tx := range txs {
if !s.cache.hadTx(tx.Tx) {
rtxs = append(rtxs, tx.Tx)
}
// if !s.cache.hadTx(tx.Tx) {
rtxs = append(rtxs, tx.Tx)
// }
s.cache.addTx(tx.Tx, tx.Bloom, from)
}
return rtxs
Expand All @@ -216,37 +216,47 @@ func (s *TxpoolStation) broadcast(txs []*types.Transaction) {
return
}
sendTask := make(map[*peerInfo][]*TransactionWithPath)
addToTask := func(txObj *TransactionWithPath) bool {
addToTask := func(name string, peerInfo *peerInfo, txObj *TransactionWithPath) bool {
tx := txObj.Tx
if _, ok := sendTask[peerInfo]; ok {
s.cache.addTx(tx, txObj.Bloom, name)
sendTask[peerInfo] = append(sendTask[peerInfo], txObj)
return true
}
if !peerInfo.trySetBusy() {
return false
}
s.cache.addTx(tx, txObj.Bloom, name)
sendTask[peerInfo] = []*TransactionWithPath{txObj}
return true
}
addToTaskAtLeast3 := func(txObj *TransactionWithPath) bool {
txSend := 0
retransmit := true // retransmit = true, if the tx don't send because of all peers were busy
tx := txObj.Tx
s.cache.ttlCheck(tx)
skipedPeers := make(map[string]*peerInfo, len(s.peers))
for name, peerInfo := range s.peers {
if txSend > 3 {
break
}
if s.cache.txHadPath(tx, name) {
retransmit = false
skipedPeers[name] = peerInfo
continue
}
if _, ok := sendTask[peerInfo]; ok {
s.cache.addTx(tx, nil, name)
sendTask[peerInfo] = append(sendTask[peerInfo], txObj)
if addToTask(name, peerInfo, txObj) {
txSend++
continue
}
if !peerInfo.trySetBusy() {
continue
}
s.cache.addTx(tx, nil, name)
sendTask[peerInfo] = []*TransactionWithPath{txObj}
txSend++
}
if txSend > 0 {
s.cache.copyTxBloom(tx, txObj.Bloom)
return false
for name, peerInfo := range skipedPeers {
if txSend >= 3 {
break
}
if addToTask(name, peerInfo, txObj) {
txSend++
}
}
return retransmit
s.cache.copyTxBloom(tx, txObj.Bloom)
return txSend == 0
}

oldTxs := s.delayedTxs[:]
Expand All @@ -255,7 +265,7 @@ func (s *TxpoolStation) broadcast(txs []*types.Transaction) {

for _, tx := range oldTxs {
txObj := &TransactionWithPath{Tx: tx, Bloom: &types.Bloom{}}
retransmit := addToTask(txObj)
retransmit := addToTaskAtLeast3(txObj)
if retransmit {
s.delayedTxs = append(s.delayedTxs, tx)
}
Expand Down

0 comments on commit 4cc7618

Please sign in to comment.