Skip to content

Commit

Permalink
Merge branch 'dev' into dposapi2
Browse files Browse the repository at this point in the history
  • Loading branch information
erickyan86 committed Aug 28, 2019
2 parents 7876752 + 0ac0682 commit e2cee0a
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 62 deletions.
143 changes: 92 additions & 51 deletions blockchain/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
ioClose
notFind
sizeNotEqual
insertError
)

// Error represent error by downloader
Expand Down Expand Up @@ -389,6 +390,30 @@ func (dl *Downloader) findAncestor(from router.Station, to router.Station, headN
return 0, &Error{fmt.Errorf("can not find ancestor after irreversibleNumber:%d", irreversibleNumber), notFind}
}

func (dl *Downloader) shortcutDownload(status *stationStatus, startNumber uint64, startHash common.Hash, endNumber uint64, endHash common.Hash) (uint64, *Error) {
resultCh := make(chan *downloadTask)
go (&downloadTask{
worker: status,
startNumber: startNumber,
startHash: startHash,
endNumber: endNumber,
endHash: endHash,
result: resultCh,
}).Do()

task := <-resultCh

if len(task.blocks) == 0 {
return startNumber, task.err
}

if index, err := dl.blockchain.InsertChain(task.blocks); err != nil {
return task.blocks[index].NumberU64() - 1, &Error{err, insertError}
}
return endNumber, nil
}

// return true means need call again
func (dl *Downloader) multiplexDownload(status *stationStatus) bool {
log.Debug("multiplexDownload start")
defer log.Debug("multiplexDownload end")
Expand All @@ -406,53 +431,71 @@ func (dl *Downloader) multiplexDownload(status *stationStatus) bool {

log.Debug("downloader station:", "node", adaptor.GetFnode(status.station))
log.Debug("downloader statusTD x ", "Local", dl.blockchain.GetTd(head.Hash(), head.NumberU64()), "Number", head.NumberU64(), "R", statusTD, "Number", statusNumber)
rand.Seed(time.Now().UnixNano())
stationSearch := router.NewLocalStation(fmt.Sprintf("downloaderSearch%d", rand.Int()), nil)
router.StationRegister(stationSearch)
defer router.StationUnregister(stationSearch)

headNumber := head.NumberU64()
if headNumber < statusNumber && statusNumber < headNumber+6 {
_, err := dl.shortcutDownload(status, headNumber, head.Hash(), statusNumber, statusHash)
if err == nil { // download and insert completed
return false
}
if err.eid == insertError || err.eid == sizeNotEqual { // download failed because of the remote's error
log.Warn("Disconnect because some error:", "node:", adaptor.GetFnode(status.station), "err", err)
router.SendTo(nil, nil, router.OneMinuteLimited, status.station) // disconnect and put into blacklist
return true
}
// download failed, continue download from other peers.
}
if headNumber > statusNumber {
headNumber = statusNumber
}

rand.Seed(time.Now().UnixNano())
stationSearch := router.NewLocalStation(fmt.Sprintf("downloaderSearch%d", rand.Int()), nil)
router.StationRegister(stationSearch)
defer router.StationUnregister(stationSearch)

ancestor, err := dl.findAncestor(stationSearch, status.station, headNumber, status.ancestor, status.errCh)
if err != nil {
log.Warn("ancestor err", "err", err, "errid:", err.eid)
if err.eid == notFind {
log.Warn("Disconnect because ancestor not find:", "station:", fmt.Sprintf("%x", status.station.Name()))
log.Warn("Disconnect because ancestor not find:", "node:", adaptor.GetFnode(status.station))
router.SendTo(nil, nil, router.OneMinuteLimited, status.station) // disconnect and put into blacklist
}
return false
}
log.Debug("downloader ancestro:", "ancestor", ancestor)
downloadStart := ancestor + 1
downloadStart := ancestor
downloadAmount := statusNumber - ancestor
if downloadAmount == 0 { // maybe the status of remote has changed
log.Debug(fmt.Sprintf("Why-1?:number: head:%d headNumber:%d statusNumber: %d", head.NumberU64(), headNumber, statusNumber))
log.Debug(fmt.Sprintf("Why-2?:hash: head %x status %x", head.Hash(), statusHash))
log.Debug(fmt.Sprintf("Why-3?:td: head:%d status: %d", dl.blockchain.GetTd(head.Hash(), head.NumberU64()).Uint64(), statusTD.Uint64()))
if downloadAmount == 0 { // maybe the status of remote was changed
return false
}
if downloadAmount > 1024 {
downloadAmount = 1024
}
downloadEnd := ancestor + downloadAmount
downloadEnd := downloadStart + downloadAmount
downloadBulk := uint64(64)
var numbers []uint64
var hashes []common.Hash
downloadSkip := downloadBulk
numbers := make([]uint64, 0, (downloadAmount+downloadBulk-1)/downloadBulk+1)
hashes := make([]common.Hash, 0, (downloadAmount+downloadBulk-1)/downloadBulk+1)
downloadSkip := downloadBulk - 1 // f(n+1) = f(n) + 1 + skip

for i := downloadStart; i <= downloadEnd; i += downloadSkip + 1 {
numbers = append(numbers, i)
}
hashes, err = getBlockHashes(stationSearch, status.station, &getBlcokHashByNumber{
Number: downloadStart,
Amount: uint64(len(numbers)),
Skip: downloadSkip,
Reverse: false}, status.errCh)
if err != nil || len(hashes) != len(numbers) {
log.Debug("getBlockHashes 1 err", "err", err, "len(hashes)", len(hashes), "len(numbers)", len(numbers))
return false
hashes = append(hashes, dl.blockchain.GetHeaderByNumber(numbers[0]).Hash())

if len(numbers[1:]) > 0 {
hash, err := getBlockHashes(stationSearch, status.station, &getBlcokHashByNumber{
Number: numbers[1],
Amount: uint64(len(numbers[1:])),
Skip: downloadSkip,
Reverse: false}, status.errCh)
if err != nil || len(hash) != len(numbers[1:]) {
log.Debug("getBlockHashes 1 err", "err", err, "len(hash)", len(hash), "len(numbers)", len(numbers[1:]))
return false
}
hashes = append(hashes, hash...)
}

if numbers[len(numbers)-1] != downloadEnd {
numbers = append(numbers, downloadEnd)
hash, err := getBlockHashes(stationSearch, status.station, &getBlcokHashByNumber{
Expand All @@ -466,26 +509,15 @@ func (dl *Downloader) multiplexDownload(status *stationStatus) bool {
}
hashes = append(hashes, hash...)
}
if len(numbers) == 1 {
numbers = append(numbers, numbers[0])
hashes = append(hashes, hashes[0])
}
// info1 := fmt.Sprintf("1 head:%d headNumber:%d statusNumber:%d ancestor:%d\n", head.NumberU64(), headNumber, statusNumber, ancestor)
// log.Debug(info1)
// info2 := fmt.Sprintf("2 head diff:%d status diff:%d\n", dl.blockchain.GetTd(head.Hash(), head.NumberU64()).Uint64(), statusTD.Uint64())
// log.Debug(info2)
// info3 := fmt.Sprintf("3 download start:%d end:%d amount:%d bluk:%d\n", downloadStart, downloadEnd, downloadAmount, downloadBulk)
// log.Debug(info3)
// info4 := fmt.Sprintf("4 numbers:%d hashes:%d\n", len(numbers), len(hashes))
// log.Debug(info4)

n, err := dl.assignDownloadTask(hashes, numbers)
status.ancestor = n
if err != nil {
log.Warn("Insert error:", "number:", n, "error", err)
failedNum := numbers[len(numbers)-1] - n
router.AddErr(status.station, failedNum)
if failedNum > 32 {
log.Warn("Disconnect because Insert error:", "station:", fmt.Sprintf("%x", status.station.Name()), "failedNum", failedNum)
log.Warn("Disconnect because Insert error:", "node:", adaptor.GetFnode(status.station), "failedNum", failedNum)
router.SendTo(nil, nil, router.OneMinuteLimited, status.station) // disconnect and put into blacklist
}
}
Expand Down Expand Up @@ -533,6 +565,7 @@ func (dl *Downloader) loop() {
}
}

// Return the height of the last successfully inserted block and error
func (dl *Downloader) assignDownloadTask(hashes []common.Hash, numbers []uint64) (uint64, *Error) {
log.Debug("assingDownloadTask:", "hashesLen", len(hashes), "numbersLen", len(numbers), "numbers", numbers)
workers := &simpleHeap{cmp: dl.remotes.cmp}
Expand Down Expand Up @@ -601,7 +634,7 @@ func (dl *Downloader) assignDownloadTask(hashes []common.Hash, numbers []uint64)
for _, start := range numbers[:len(numbers)-1] {
blocks := insertList[start]
if blocks == nil {
return start - 1, nil
return start, nil
}
if index, err := dl.blockchain.InsertChain(blocks); err != nil {
return blocks[index].NumberU64() - 1, &Error{err, other}
Expand All @@ -619,11 +652,17 @@ type downloadTask struct {
blocks []*types.Block // result blocks, length == 0 means failed
errorTotal int // total error amount
result chan *downloadTask // result channel
err *Error
}

func (task *downloadTask) Do() {
var err *Error
var headers []*types.Header
var bodies []*types.Body

latestStatus := task.worker.getStatus()
defer func() {
task.err = err
task.result <- task
diff := latestStatus.Number - task.endNumber
if latestStatus.Number < task.endNumber {
Expand All @@ -648,26 +687,28 @@ func (task *downloadTask) Do() {
reqHash.Skip = 0
reqHash.Amount = 1
}
hashes, err := getBlockHashes(station, remote, reqHash, task.worker.errCh)
if err != nil || len(hashes) != int(reqHash.Amount) ||
hashes[0] != task.startHash || hashes[len(hashes)-1] != task.endHash {
log.Debug(fmt.Sprint("err-1:", err, task.startNumber, task.endNumber, len(hashes)))
if len(hashes) > 0 {
log.Debug(fmt.Sprintf("0:%x\n0e:%x\ns:%x\nse:%x", hashes[0], hashes[len(hashes)-1], task.startHash, task.endHash))
/*
hashes, err := getBlockHashes(station, remote, reqHash, task.worker.errCh)
if err != nil || len(hashes) != int(reqHash.Amount) ||
hashes[0] != task.startHash || hashes[len(hashes)-1] != task.endHash {
log.Debug(fmt.Sprint("err-1:", err, task.startNumber, task.endNumber, len(hashes)))
if len(hashes) > 0 {
log.Debug(fmt.Sprintf("0:%x\n0e:%x\ns:%x\nse:%x", hashes[0], hashes[len(hashes)-1], task.startHash, task.endHash))
}
return
}
return
}
downloadAmount := task.endNumber - task.startNumber + 1
headers, err := getHeaders(station, remote, &getBlockHeadersData{
*/
downloadAmount := task.endNumber - task.startNumber
headers, err = getHeaders(station, remote, &getBlockHeadersData{
hashOrNumber{
Number: task.startNumber,
Number: task.startNumber + 1,
}, downloadAmount, 0, false,
}, task.worker.errCh)
if err != nil || len(headers) != int(downloadAmount) {
log.Debug(fmt.Sprint("err-2:", err, len(headers), downloadAmount))
return
}
if headers[0].Number.Uint64() != task.startNumber || headers[0].Hash() != task.startHash ||
if headers[0].Number.Uint64() != task.startNumber+1 || headers[0].ParentHash != task.startHash ||
headers[len(headers)-1].Number.Uint64() != task.endNumber || headers[len(headers)-1].Hash() != task.endHash {
log.Debug(fmt.Sprintf("e2-1 0d:%d\n0ed:%d\nsd:%d\nsed:%d", headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64(), task.startNumber, task.endNumber))
log.Debug(fmt.Sprintf("e2-2 0:%x\n0e:%x\ns:%x\nse:%x", headers[0].Hash(), headers[len(headers)-1].Hash(), task.startHash, task.endHash))
Expand All @@ -682,12 +723,12 @@ func (task *downloadTask) Do() {

reqHashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
if header.Hash() != emptyHash {
if header.TxsRoot != emptyHash {
reqHashes = append(reqHashes, header.Hash())
}
}

bodies, err := getBlocks(station, remote, reqHashes, task.worker.errCh)
bodies, err = getBlocks(station, remote, reqHashes, task.worker.errCh)
if err != nil || len(bodies) != len(reqHashes) {
log.Debug(fmt.Sprint("err-4:", err, len(bodies), len(reqHashes)))
return
Expand All @@ -696,7 +737,7 @@ func (task *downloadTask) Do() {
blocks := make([]*types.Block, len(headers))
bodyIndex := 0
for i, header := range headers {
if header.Hash() == emptyHash {
if header.TxsRoot == emptyHash {
blocks[i] = types.NewBlockWithHeader(header)
} else {
blocks[i] = types.NewBlockWithHeader(header).WithBody(bodies[bodyIndex].Transactions)
Expand Down
2 changes: 2 additions & 0 deletions blockchain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/fractalplatform/fractal/common"
router "github.com/fractalplatform/fractal/event"
adaptor "github.com/fractalplatform/fractal/p2p/protoadaptor"
"github.com/fractalplatform/fractal/types"
)

Expand Down Expand Up @@ -140,6 +141,7 @@ func (bs *BlockchainStation) loop() {
}()
default:
if router.Thread(e.From) > 3 {
log.Warn("Disconnect because request too frequently:", "node:", adaptor.GetFnode(e.From), "thread", router.Thread(e.From))
router.SendTo(nil, nil, router.OneMinuteLimited, e.From)
continue
}
Expand Down
53 changes: 53 additions & 0 deletions blockchain/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package blockchain

import (
"testing"
)

/*
type simuAdaptor struct{}
var client = router.NewRemoteStation("testClient", nil)
var server = router.NewRemoteStation("testServer", nil)
func (simuAdaptor) SendOut(e *router.Event) error {
if e.To == server {
e.To = nil
} else {
e.To = router.NewLocalStation(e.To.Name(), nil)
}
//e.From = router.NewLocalStation(e.From.Name(), nil)
router.SendEvent(e)
return nil
}
*/
func TestHandler(t *testing.T) {
// printLog(log.LvlDebug)
genesis := DefaultGenesis()
genesis.AllocAccounts = append(genesis.AllocAccounts, getDefaultGenesisAccounts()...)
chain := newCanonical(t, genesis)
defer chain.Stop()

allCandidates, allHeaderTimes := genCanonicalCandidatesAndTimes(genesis)
makeNewChain(t, genesis, chain, allCandidates, allHeaderTimes)

//router.AdaptorRegister(simuAdaptor{})
errCh := make(chan struct{})
hash, err := getBlockHashes(nil, nil, &getBlcokHashByNumber{0, 1, 0, true}, errCh)
if err != nil || len(hash) != 1 || hash[0] != chain.GetHeaderByNumber(0).Hash() {
t.Fatal("genesis block not match")
}
if err != nil || len(hash) != 1 || hash[0] != chain.GetHeaderByNumber(0).Hash() {
t.Fatal("genesis block hash not match")
}

headers, err := getHeaders(nil, nil, &getBlockHeadersData{
hashOrNumber{
Number: 0,
}, 1, 0, false,
}, errCh)
if err != nil || len(headers) != 1 || headers[0].Number.Uint64() != 0 || headers[0].Hash() != chain.GetHeaderByNumber(headers[0].Number.Uint64()).Hash() {
t.Fatal("genesis block header not match")
}

}
1 change: 1 addition & 0 deletions cmd/ft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func defaultNodeConfig() *node.Config {
WSPort: 8546,
WSModules: []string{"ft"},
Logger: log.New(),
P2PNodeDatabase: "nodedb",
P2PConfig: defaultP2pConfig(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/ft/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,9 @@ func addFlags(flags *flag.FlagSet) {
viper.BindPFlag("ftservice.p2p.listenaddr", flags.Lookup("p2p_listenaddr"))

flags.StringVar(
&ftCfgInstance.NodeCfg.P2PConfig.NodeDatabase,
&ftCfgInstance.NodeCfg.P2PNodeDatabase,
"p2p_nodedb",
ftCfgInstance.NodeCfg.P2PConfig.NodeDatabase,
ftCfgInstance.NodeCfg.P2PNodeDatabase,
"The path to the database containing the previously seen live nodes in the network",
)
viper.BindPFlag("ftservice.p2p.nodedb", flags.Lookup("p2p_nodedb"))
Expand Down
5 changes: 3 additions & 2 deletions cmd/ftfinder/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var RootCmd = &cobra.Command{
nodeConfig.P2PConfig.BootstrapNodes = nodeConfig.BootNodes()
nodeConfig.P2PConfig.GenesisHash = common.HexToHash(hexStr)
nodeConfig.P2PConfig.Logger = log.New()
nodeConfig.P2PConfig.NodeDatabase = nodeConfig.NodeDB()
srv := p2p.Server{
Config: nodeConfig.P2PConfig,
}
Expand Down Expand Up @@ -83,9 +84,9 @@ func init() {
)

flags.StringVar(
&nodeConfig.P2PConfig.NodeDatabase,
&nodeConfig.P2PNodeDatabase,
"p2p_nodedb",
nodeConfig.P2PConfig.NodeDatabase,
nodeConfig.P2PNodeDatabase,
"The path to the database containing the previously seen live nodes in the network",
)

Expand Down

0 comments on commit e2cee0a

Please sign in to comment.