forked from aergoio/aergo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hashreceiver.go
94 lines (83 loc) · 3.22 KB
/
hashreceiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
* @file
* @copyright defined in aergo/LICENSE.txt
*/
package p2p
import (
"time"
"github.com/aergoio/aergo/message"
"github.com/aergoio/aergo/p2p/p2pcommon"
"github.com/aergoio/aergo/p2p/subproto"
"github.com/aergoio/aergo/types"
"github.com/golang/protobuf/proto"
)
// BlocksChunkReceiver is send p2p getBlocksRequest to target peer and receive p2p responses till all requestes blocks are received
// It will send response actor message if all blocks are received or failed to receive, but not send response if timeout expired.
type BlockHashesReceiver struct {
syncerSeq uint64
requestID p2pcommon.MsgID
peer p2pcommon.RemotePeer
actor p2pcommon.ActorService
prevBlock *types.BlockInfo
count int
timeout time.Time
finished bool
got []message.BlockHash
offset int
}
func NewBlockHashesReceiver(actor p2pcommon.ActorService, peer p2pcommon.RemotePeer, seq uint64, req *message.GetHashes, ttl time.Duration) *BlockHashesReceiver {
timeout := time.Now().Add(ttl)
return &BlockHashesReceiver{syncerSeq:seq, actor: actor, peer: peer, prevBlock: req.PrevInfo, count: int(req.Count), timeout: timeout, got: make([]message.BlockHash, 0, int(req.Count))}
}
func (br *BlockHashesReceiver) StartGet() {
// create message data
req := &types.GetHashesRequest{PrevHash: br.prevBlock.Hash, PrevNumber: br.prevBlock.No, Size: uint64(br.count)}
mo := br.peer.MF().NewMsgBlockRequestOrder(br.ReceiveResp, subproto.GetHashesRequest, req)
br.peer.SendMessage(mo)
br.requestID = mo.GetMsgID()
}
// ReceiveResp must be called just in read go routine
func (br *BlockHashesReceiver) ReceiveResp(msg p2pcommon.Message, msgBody proto.Message) (ret bool) {
ret = true
// timeout
if br.finished || br.timeout.Before(time.Now()) {
// silently ignore already finished job
//br.actor.TellRequest(message.SyncerSvc,&message.GetBlockChunksRsp{ToWhom:br.peer.ID(), Err:message.RemotePeerFailError})
br.finished = true
br.peer.ConsumeRequest(br.requestID)
return
}
// remote peer response failure
body := msgBody.(*types.GetHashesResponse)
if body.Status != types.ResultStatus_OK || len(body.Hashes) == 0 {
br.actor.TellRequest(message.SyncerSvc, &message.GetHashesRsp{Seq:br.syncerSeq, Hashes: nil, PrevInfo: br.prevBlock, Count: 0, Err: message.RemotePeerFailError})
br.finished = true
br.peer.ConsumeRequest(br.requestID)
return
}
// add to Got
for _, block := range body.Hashes {
// unexpected block
br.got = append(br.got, block)
br.offset++
// check overflow
if br.offset >= int(br.count) {
br.actor.TellRequest(message.SyncerSvc, &message.GetHashesRsp{Seq:br.syncerSeq, Hashes: br.got, PrevInfo: br.prevBlock, Count: uint64(br.offset)})
br.finished = true
br.peer.ConsumeRequest(br.requestID)
return
}
}
// is it end?
if !body.HasNext {
if br.offset < br.count {
br.actor.TellRequest(message.SyncerSvc, &message.GetHashesRsp{Seq:br.syncerSeq, Hashes: br.got, PrevInfo: br.prevBlock, Count: 0, Err: message.MissingHashError})
// not all blocks were filled. this is error
} else {
br.actor.TellRequest(message.SyncerSvc, &message.GetHashesRsp{Seq:br.syncerSeq, Hashes: br.got, PrevInfo: br.prevBlock, Count: uint64(len(br.got))})
}
br.finished = true
br.peer.ConsumeRequest(br.requestID)
}
return
}