-
Notifications
You must be signed in to change notification settings - Fork 52
/
abci.go
207 lines (179 loc) · 7.67 KB
/
abci.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package abci
import (
"github.com/Fairblock/fairyring/blockbuster"
"github.com/Fairblock/fairyring/blockbuster/lanes/terminator"
"github.com/Fairblock/fairyring/blockbuster/utils"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
sdk "github.com/cosmos/cosmos-sdk/types"
)
type (
// ProposalHandler is a wrapper around the ABCI++ PrepareProposal and ProcessProposal
// handlers.
ProposalHandler struct {
logger log.Logger
txDecoder sdk.TxDecoder
prepareLanesHandler blockbuster.PrepareLanesHandler
processLanesHandler blockbuster.ProcessLanesHandler
}
)
// NewProposalHandler returns a new abci++ proposal handler.
func NewProposalHandler(logger log.Logger, txDecoder sdk.TxDecoder, mempool blockbuster.Mempool) *ProposalHandler {
return &ProposalHandler{
logger: logger,
txDecoder: txDecoder,
prepareLanesHandler: ChainPrepareLanes(mempool.Registry()...),
processLanesHandler: ChainProcessLanes(mempool.Registry()...),
}
}
// PrepareProposalHandler prepares the proposal by selecting transactions from each lane
// according to each lane's selection logic. We select transactions in a greedy fashion. Note that
// each lane has an boundary on the number of bytes that can be included in the proposal. By default,
// the default lane will not have a boundary on the number of bytes that can be included in the proposal and
// will include all valid transactions in the proposal (up to MaxTxBytes).
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req abci.RequestPrepareProposal) (resp abci.ResponsePrepareProposal) {
// In the case where there is a panic, we recover here and return an empty proposal.
defer func() {
if err := recover(); err != nil {
h.logger.Error("failed to prepare proposal", "err", err)
resp = abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}
}
}()
proposal, err := h.prepareLanesHandler(ctx, blockbuster.NewProposal(req.MaxTxBytes))
if err != nil {
h.logger.Error("failed to prepare proposal", "err", err)
return abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}
}
h.logger.Info(
"prepared proposal",
"num_txs", proposal.GetNumTxs(),
"total_tx_bytes", proposal.GetTotalTxBytes(),
)
return abci.ResponsePrepareProposal{
Txs: proposal.GetProposal(),
}
}
}
// ProcessProposalHandler processes the proposal by verifying all transactions in the proposal
// according to each lane's verification logic. We verify proposals in a greedy fashion.
// If a lane's portion of the proposal is invalid, we reject the proposal. After a lane's portion
// of the proposal is verified, we pass the remaining transactions to the next lane in the chain.
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req abci.RequestProcessProposal) (resp abci.ResponseProcessProposal) {
// In the case where any of the lanes panic, we recover here and return a reject status.
defer func() {
if err := recover(); err != nil {
h.logger.Error("failed to process proposal", "err", err)
resp = abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
}()
txs := req.Txs
if len(txs) == 0 {
h.logger.Info("accepted empty proposal")
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}
}
// Decode the transactions from the proposal.
decodedTxs, err := utils.GetDecodedTxs(h.txDecoder, txs)
if err != nil {
h.logger.Error("failed to decode transactions", "err", err)
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
// Verify the proposal using the verification logic from each lane.
if _, err := h.processLanesHandler(ctx, decodedTxs); err != nil {
h.logger.Error("failed to validate the proposal", "err", err)
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
h.logger.Info("validated proposal", "num_txs", len(txs))
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}
}
}
// ChainPrepareLanes chains together the proposal preparation logic from each lane
// into a single function. The first lane in the chain is the first lane to be prepared and
// the last lane in the chain is the last lane to be prepared.
//
// In the case where any of the lanes fail to prepare the partial proposal, the lane that failed
// will be skipped and the next lane in the chain will be called to prepare the proposal.
func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandler {
if len(chain) == 0 {
return nil
}
// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
}
return func(ctx sdk.Context, partialProposal blockbuster.BlockProposal) (finalProposal blockbuster.BlockProposal, err error) {
lane := chain[0]
lane.Logger().Info("preparing lane", "lane", lane.Name())
// Cache the context in the case where any of the lanes fail to prepare the proposal.
cacheCtx, write := ctx.CacheContext()
defer func() {
if rec := recover(); rec != nil || err != nil {
lane.Logger().Error("failed to prepare lane", "lane", lane.Name(), "err", err, "recover_error", rec)
lanesRemaining := len(chain)
switch {
case lanesRemaining <= 2:
// If there are only two lanes remaining, then the first lane in the chain
// is the lane that failed to prepare the partial proposal and the second lane in the
// chain is the terminator lane. We return the proposal as is.
finalProposal, err = partialProposal, nil
default:
// If there are more than two lanes remaining, then the first lane in the chain
// is the lane that failed to prepare the proposal but the second lane in the
// chain is not the terminator lane so there could potentially be more transactions
// added to the proposal
maxTxBytesForLane := utils.GetMaxTxBytesForLane(
partialProposal.GetMaxTxBytes(),
partialProposal.GetTotalTxBytes(),
chain[1].GetMaxBlockSpace(),
)
finalProposal, err = chain[1].PrepareLane(
ctx,
partialProposal,
maxTxBytesForLane,
ChainPrepareLanes(chain[2:]...),
)
}
} else {
// Write the cache to the context since we know that the lane successfully prepared
// the partial proposal.
write()
}
}()
// Get the maximum number of bytes that can be included in the proposal for this lane.
maxTxBytesForLane := utils.GetMaxTxBytesForLane(
partialProposal.GetMaxTxBytes(),
partialProposal.GetTotalTxBytes(),
lane.GetMaxBlockSpace(),
)
return lane.PrepareLane(
cacheCtx,
partialProposal,
maxTxBytesForLane,
ChainPrepareLanes(chain[1:]...),
)
}
}
// ChainProcessLanes chains together the proposal verification logic from each lane
// into a single function. The first lane in the chain is the first lane to be verified and
// the last lane in the chain is the last lane to be verified.
func ChainProcessLanes(chain ...blockbuster.Lane) blockbuster.ProcessLanesHandler {
if len(chain) == 0 {
return nil
}
// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
}
return func(ctx sdk.Context, proposalTxs []sdk.Tx) (sdk.Context, error) {
// Short circuit if there are no transactions to process.
if len(proposalTxs) == 0 {
return ctx, nil
}
chain[0].Logger().Info("processing lane", "lane", chain[0].Name())
if err := chain[0].ProcessLaneBasic(proposalTxs); err != nil {
return ctx, err
}
return chain[0].ProcessLane(ctx, proposalTxs, ChainProcessLanes(chain[1:]...))
}
}