forked from kaspanet/kaspad
-
Notifications
You must be signed in to change notification settings - Fork 2
/
handle_request_pruning_point_utxo_set.go
140 lines (116 loc) · 4.9 KB
/
handle_request_pruning_point_utxo_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package blockrelay
import (
"errors"
"github.com/Kash-Protocol/kashd/app/appmessage"
"github.com/Kash-Protocol/kashd/app/protocol/common"
"github.com/Kash-Protocol/kashd/app/protocol/protocolerrors"
"github.com/Kash-Protocol/kashd/domain"
"github.com/Kash-Protocol/kashd/domain/consensus/model/externalapi"
"github.com/Kash-Protocol/kashd/domain/consensus/ruleerrors"
"github.com/Kash-Protocol/kashd/infrastructure/logger"
"github.com/Kash-Protocol/kashd/infrastructure/network/netadapter/router"
)
// HandleRequestPruningPointUTXOSetContext is the interface for the context needed for the HandleRequestPruningPointUTXOSet flow.
type HandleRequestPruningPointUTXOSetContext interface {
Domain() domain.Domain
}
type handleRequestPruningPointUTXOSetFlow struct {
HandleRequestPruningPointUTXOSetContext
incomingRoute, outgoingRoute *router.Route
}
// HandleRequestPruningPointUTXOSet listens to appmessage.MsgRequestPruningPointUTXOSet messages and sends
// the pruning point UTXO set and block body.
func HandleRequestPruningPointUTXOSet(context HandleRequestPruningPointUTXOSetContext, incomingRoute,
outgoingRoute *router.Route) error {
flow := &handleRequestPruningPointUTXOSetFlow{
HandleRequestPruningPointUTXOSetContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
}
return flow.start()
}
func (flow *handleRequestPruningPointUTXOSetFlow) start() error {
for {
msgRequestPruningPointUTXOSet, err := flow.waitForRequestPruningPointUTXOSetMessages()
if err != nil {
return err
}
err = flow.handleRequestPruningPointUTXOSetMessage(msgRequestPruningPointUTXOSet)
if err != nil {
return err
}
}
}
func (flow *handleRequestPruningPointUTXOSetFlow) handleRequestPruningPointUTXOSetMessage(
msgRequestPruningPointUTXOSet *appmessage.MsgRequestPruningPointUTXOSet) error {
onEnd := logger.LogAndMeasureExecutionTime(log, "handleRequestPruningPointUTXOSetFlow")
defer onEnd()
log.Debugf("Got request for pruning point UTXO set")
return flow.sendPruningPointUTXOSet(msgRequestPruningPointUTXOSet)
}
func (flow *handleRequestPruningPointUTXOSetFlow) waitForRequestPruningPointUTXOSetMessages() (
*appmessage.MsgRequestPruningPointUTXOSet, error) {
message, err := flow.incomingRoute.Dequeue()
if err != nil {
return nil, err
}
msgRequestPruningPointUTXOSet, ok := message.(*appmessage.MsgRequestPruningPointUTXOSet)
if !ok {
// TODO: Change to shouldBan: true once we fix the bug of getting redundant messages
return nil, protocolerrors.Errorf(false, "received unexpected message type. "+
"expected: %s, got: %s", appmessage.CmdRequestPruningPointUTXOSet, message.Command())
}
return msgRequestPruningPointUTXOSet, nil
}
func (flow *handleRequestPruningPointUTXOSetFlow) sendPruningPointUTXOSet(
msgRequestPruningPointUTXOSet *appmessage.MsgRequestPruningPointUTXOSet) error {
// Send the UTXO set in `step`-sized chunks
const step = 1000
var fromOutpoint *externalapi.DomainOutpoint
chunksSent := 0
for {
pruningPointUTXOs, err := flow.Domain().Consensus().GetPruningPointUTXOs(
msgRequestPruningPointUTXOSet.PruningPointHash, fromOutpoint, step)
if err != nil {
if errors.Is(err, ruleerrors.ErrWrongPruningPointHash) {
return flow.outgoingRoute.Enqueue(appmessage.NewMsgUnexpectedPruningPoint())
}
}
log.Debugf("Retrieved %d UTXOs for pruning block %s",
len(pruningPointUTXOs), msgRequestPruningPointUTXOSet.PruningPointHash)
outpointAndUTXOEntryPairs :=
appmessage.DomainOutpointAndUTXOEntryPairsToOutpointAndUTXOEntryPairs(pruningPointUTXOs)
err = flow.outgoingRoute.Enqueue(appmessage.NewMsgPruningPointUTXOSetChunk(outpointAndUTXOEntryPairs))
if err != nil {
return err
}
finished := len(pruningPointUTXOs) < step
if finished && chunksSent%ibdBatchSize != 0 {
log.Debugf("Finished sending UTXOs for pruning block %s",
msgRequestPruningPointUTXOSet.PruningPointHash)
return flow.outgoingRoute.Enqueue(appmessage.NewMsgDonePruningPointUTXOSetChunks())
}
if len(pruningPointUTXOs) > 0 {
fromOutpoint = pruningPointUTXOs[len(pruningPointUTXOs)-1].Outpoint
}
chunksSent++
// Wait for the peer to request more chunks every `ibdBatchSize` chunks
if chunksSent%ibdBatchSize == 0 {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return err
}
_, ok := message.(*appmessage.MsgRequestNextPruningPointUTXOSetChunk)
if !ok {
// TODO: Change to shouldBan: true once we fix the bug of getting redundant messages
return protocolerrors.Errorf(false, "received unexpected message type. "+
"expected: %s, got: %s", appmessage.CmdRequestNextPruningPointUTXOSetChunk, message.Command())
}
if finished {
log.Debugf("Finished sending UTXOs for pruning block %s",
msgRequestPruningPointUTXOSet.PruningPointHash)
return flow.outgoingRoute.Enqueue(appmessage.NewMsgDonePruningPointUTXOSetChunks())
}
}
}
}