forked from aergoio/aergo
/
getblock.go
154 lines (133 loc) · 5.76 KB
/
getblock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
* @file
* @copyright defined in aergo/LICENSE.txt
*/
package subproto
import (
"fmt"
"time"
"github.com/aergoio/aergo-lib/log"
"github.com/aergoio/aergo/internal/enc"
"github.com/aergoio/aergo/p2p/p2pcommon"
"github.com/aergoio/aergo/p2p/p2putil"
"github.com/aergoio/aergo/types"
"github.com/golang/protobuf/proto"
)
type blockRequestHandler struct {
BaseMsgHandler
}
var _ p2pcommon.MessageHandler = (*blockRequestHandler)(nil)
type blockResponseHandler struct {
BaseMsgHandler
}
var _ p2pcommon.MessageHandler = (*blockResponseHandler)(nil)
// newBlockReqHandler creates handler for GetBlockRequest
func NewBlockReqHandler(pm p2pcommon.PeerManager, peer p2pcommon.RemotePeer, logger *log.Logger, actor p2pcommon.ActorService) *blockRequestHandler {
bh := &blockRequestHandler{BaseMsgHandler: BaseMsgHandler{protocol: GetBlocksRequest, pm: pm, peer: peer, actor: actor, logger: logger}}
return bh
}
func (bh *blockRequestHandler) ParsePayload(rawbytes []byte) (proto.Message, error) {
return p2putil.UnmarshalAndReturn(rawbytes, &types.GetBlockRequest{})
}
const (
EmptyGetBlockResponseSize = 12 // roughly estimated maximum size if element is full
)
func (bh *blockRequestHandler) Handle(msg p2pcommon.Message, msgBody proto.Message) {
remotePeer := bh.peer
data := msgBody.(*types.GetBlockRequest)
p2putil.DebugLogReceiveMsg(bh.logger, bh.protocol, msg.ID().String(), remotePeer, len(data.Hashes))
requestID := msg.ID()
sliceCap := p2pcommon.MaxBlockResponseCount
if len(data.Hashes) < sliceCap {
sliceCap = len(data.Hashes)
}
defaultMsgTimeout := time.Second * 30
// find block info from chainservice
idx := 0
msgSentCount := 0
status := types.ResultStatus_OK
blockInfos := make([]*types.Block, 0, sliceCap)
payloadSize := EmptyGetBlockResponseSize
var blockSize, fieldSize int
for _, hash := range data.Hashes {
foundBlock, err := bh.actor.GetChainAccessor().GetBlock(hash)
if err != nil {
// the block hash from request must exists. this error is fatal.
bh.logger.Warn().Err(err).Str(p2putil.LogBlkHash, enc.ToString(hash)).Str(p2putil.LogOrgReqID, requestID.String()).Msg("failed to get block while processing getBlock")
status = types.ResultStatus_INTERNAL
break
}
if foundBlock == nil {
// the remote peer request not existing block
bh.logger.Debug().Str(p2putil.LogBlkHash, enc.ToString(hash)).Str(p2putil.LogOrgReqID, requestID.String()).Msg("requested block hash is missing")
status = types.ResultStatus_NOT_FOUND
break
}
blockSize = proto.Size(foundBlock)
fieldSize = blockSize + p2putil.CalculateFieldDescSize(blockSize)
if len(blockInfos) >= sliceCap || (payloadSize+fieldSize) > p2pcommon.MaxPayloadLength {
msgSentCount++
// send partial list
resp := &types.GetBlockResponse{
Status: status,
Blocks: blockInfos,
HasNext: true,
//HasNext:msgSentCount<MaxResponseSplitCount, // always have nextItem ( see foundBlock) but msg count limit will affect
}
bh.logger.Debug().Uint64("first_blk_number", blockInfos[0].Header.GetBlockNo()).Int(p2putil.LogBlkCount, len(blockInfos)).Str(p2putil.LogOrgReqID, requestID.String()).Msg("Sending partial getBlock response")
err := remotePeer.SendAndWaitMessage(remotePeer.MF().NewMsgResponseOrder(requestID, GetBlocksResponse, resp), defaultMsgTimeout)
if err != nil {
bh.logger.Info().Uint64("first_blk_number", blockInfos[0].Header.GetBlockNo()).Err(err).Int(p2putil.LogBlkCount, len(blockInfos)).Str(p2putil.LogOrgReqID, requestID.String()).Msg("Sending failed")
return
}
blockInfos = make([]*types.Block, 0, sliceCap)
payloadSize = EmptyGetBlockResponseSize
}
blockInfos = append(blockInfos, foundBlock)
payloadSize += fieldSize
idx++
}
if 0 == idx {
status = types.ResultStatus_NOT_FOUND
}
// Failed response does not need incomplete blocks information
if status != types.ResultStatus_OK {
blockInfos = blockInfos[:0]
}
// generate response message
resp := &types.GetBlockResponse{
Status: status,
Blocks: blockInfos, HasNext: false}
// ???: have to check arguments
bh.logger.Debug().Int(p2putil.LogBlkCount, len(blockInfos)).Str(p2putil.LogOrgReqID, requestID.String()).Msg("Sending last part of getBlock response")
err := remotePeer.SendAndWaitMessage(remotePeer.MF().NewMsgResponseOrder(requestID, GetBlocksResponse, resp), defaultMsgTimeout)
if err != nil {
bh.logger.Info().Int(p2putil.LogBlkCount, len(data.Hashes)).Err(err).Str(p2putil.LogOrgReqID, requestID.String()).Msg("Sending failed")
return
}
}
// newBlockRespHandler creates handler for GetBlockResponse
func NewBlockRespHandler(pm p2pcommon.PeerManager, peer p2pcommon.RemotePeer, logger *log.Logger, actor p2pcommon.ActorService, sm p2pcommon.SyncManager) *blockResponseHandler {
bh := &blockResponseHandler{BaseMsgHandler: BaseMsgHandler{protocol: GetBlocksResponse, pm: pm, sm: sm, peer: peer, actor: actor, logger: logger}}
return bh
}
func (bh *blockResponseHandler) ParsePayload(rawbytes []byte) (proto.Message, error) {
return p2putil.UnmarshalAndReturn(rawbytes, &types.GetBlockResponse{})
}
func (bh *blockResponseHandler) Handle(msg p2pcommon.Message, msgBody proto.Message) {
remotePeer := bh.peer
data := msgBody.(*types.GetBlockResponse)
if bh.logger.IsDebugEnabled() {
additional := fmt.Sprintf("hashNext=%t,%s", data.HasNext, p2putil.PrintHashList(data.Blocks))
p2putil.DebugLogReceiveResponseMsg(bh.logger, bh.protocol, msg.ID().String(), msg.OriginalID().String(), remotePeer, additional)
}
// locate request data and remove it if found
if !remotePeer.GetReceiver(msg.OriginalID())(msg, data) {
remotePeer.ConsumeRequest(msg.OriginalID())
// TODO temporary code and will be deleted after newer syncer is made.
if data.Status != types.ResultStatus_OK || len(data.Blocks) == 0 {
return
}
bh.sm.HandleGetBlockResponse(remotePeer, msg, data)
}
}