forked from aergoio/aergo
/
hashbynoreceiver.go
71 lines (61 loc) · 2.29 KB
/
hashbynoreceiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
* @file
* @copyright defined in aergo/LICENSE.txt
*/
package p2p
import (
"time"
"github.com/aergoio/aergo/message"
"github.com/aergoio/aergo/p2p/p2pcommon"
"github.com/aergoio/aergo/p2p/subproto"
"github.com/aergoio/aergo/types"
"github.com/golang/protobuf/proto"
)
// BlocksChunkReceiver is send p2p getBlocksRequest to target peer and receive p2p responses till all requestes blocks are received
// It will send response actor message if all blocks are received or failed to receive, but not send response if timeout expired.
type BlockHashByNoReceiver struct {
syncerSeq uint64
requestID p2pcommon.MsgID
peer p2pcommon.RemotePeer
actor p2pcommon.ActorService
blockNo types.BlockNo
timeout time.Time
finished bool
got message.BlockHash
}
func NewBlockHashByNoReceiver(actor p2pcommon.ActorService, peer p2pcommon.RemotePeer, seq uint64, blockNo types.BlockNo, ttl time.Duration) *BlockHashByNoReceiver {
timeout := time.Now().Add(ttl)
return &BlockHashByNoReceiver{syncerSeq: seq, actor: actor, peer: peer, blockNo: blockNo, timeout: timeout}
}
func (br *BlockHashByNoReceiver) StartGet() {
// create message data
req := &types.GetHashByNo{BlockNo: br.blockNo}
mo := br.peer.MF().NewMsgBlockRequestOrder(br.ReceiveResp, subproto.GetHashByNoRequest, req)
br.requestID = mo.GetMsgID()
br.peer.SendMessage(mo)
}
// ReceiveResp must be called just in read go routine
func (br *BlockHashByNoReceiver) ReceiveResp(msg p2pcommon.Message, msgBody proto.Message) (ret bool) {
ret = true
// timeout
if br.finished || br.timeout.Before(time.Now()) {
// silently ignore already finished job
//br.actor.TellRequest(message.SyncerSvc,&message.GetBlockChunksRsp{ToWhom:br.peer.ID(), Err:message.RemotePeerFailError})
br.finished = true
br.peer.ConsumeRequest(br.requestID)
return
}
// remote peer response failure
body := msgBody.(*types.GetHashByNoResponse)
if body.Status != types.ResultStatus_OK {
br.actor.TellRequest(message.SyncerSvc, &message.GetHashByNoRsp{Seq:br.syncerSeq, BlockHash: nil, Err: message.RemotePeerFailError})
br.finished = true
br.peer.ConsumeRequest(br.requestID)
return
}
br.got = body.BlockHash
br.actor.TellRequest(message.SyncerSvc, &message.GetHashByNoRsp{Seq:br.syncerSeq, BlockHash: br.got})
br.finished = true
br.peer.ConsumeRequest(br.requestID)
return
}