/
msg_handlers.go
224 lines (206 loc) · 6.68 KB
/
msg_handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package consensus
import (
"context"
"fmt"
cstypes "github.com/dashpay/tenderdash/internal/consensus/types"
"github.com/dashpay/tenderdash/libs/log"
)
type msgInfoDispatcher struct {
proposalHandler msgHandlerFunc
blockPartHandler msgHandlerFunc
voteHandler msgHandlerFunc
commitHandler msgHandlerFunc
}
func (c *msgInfoDispatcher) match(m Message) (msgHandlerFunc, error) {
switch m.(type) {
case *ProposalMessage:
return c.proposalHandler, nil
case *BlockPartMessage:
return c.blockPartHandler, nil
case *VoteMessage:
return c.voteHandler, nil
case *CommitMessage:
return c.commitHandler, nil
}
return nil, fmt.Errorf("got unknown %T type", m)
}
func (c *msgInfoDispatcher) dispatch(ctx context.Context, stateData *StateData, msg Message, opts ...func(envelope *msgEnvelope)) error {
var m any = msg
mi := m.(msgInfo)
if mi.Msg == nil {
return nil
}
envelope := msgEnvelope{
msgInfo: mi,
fromReplay: false,
}
for _, opt := range opts {
opt(&envelope)
}
handler, err := c.match(mi.Msg)
if err != nil {
return fmt.Errorf("message handler not found: %w", err)
}
return handler(ctx, stateData, envelope)
}
func newMsgInfoDispatcher(
ctrl *Controller,
proposaler cstypes.Proposaler,
wal WALWriteFlusher,
logger log.Logger,
) *msgInfoDispatcher {
mws := []msgMiddlewareFunc{
msgInfoWithCtxMiddleware(),
loggingMiddleware(logger),
walMiddleware(wal, logger),
}
proposalHandler := withMiddleware(proposalMessageHandler(proposaler), mws...)
blockPartHandler := withMiddleware(blockPartMessageHandler(ctrl), mws...)
voteHandler := withMiddleware(voteMessageHandler(ctrl), mws...)
commitHandler := withMiddleware(commitMessageHandler(ctrl), mws...)
return &msgInfoDispatcher{
proposalHandler: proposalHandler,
blockPartHandler: blockPartHandler,
voteHandler: voteHandler,
commitHandler: commitHandler,
}
}
func proposalMessageHandler(propSetter cstypes.ProposalSetter) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, envelope msgEnvelope) error {
msg := envelope.Msg.(*ProposalMessage)
return propSetter.Set(msg.Proposal, envelope.ReceiveTime, &stateData.RoundState)
}
}
func blockPartMessageHandler(ctrl *Controller) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, envelope msgEnvelope) error {
logger := log.FromCtxOrNop(ctx)
msg := envelope.Msg.(*BlockPartMessage)
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
err := ctrl.Dispatch(ctx, &AddProposalBlockPartEvent{
Msg: msg,
PeerID: envelope.PeerID,
FromReplay: envelope.fromReplay,
}, stateData)
if err != nil && msg.Round != stateData.Round {
logger.Trace("received block part from wrong round")
return nil
}
return err
}
}
func voteMessageHandler(ctrl *Controller) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, envelope msgEnvelope) error {
msg := envelope.Msg.(*VoteMessage)
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
err := ctrl.Dispatch(ctx, &AddVoteEvent{Vote: msg.Vote, PeerID: envelope.PeerID}, stateData)
// TODO: punish peer
// We probably don't want to stop the peer here. The vote does not
// necessarily comes from a malicious peer but can be just broadcasted by
// a typical peer.
// https://github.com/tendermint/tendermint/issues/1281
// NOTE: the vote is broadcast to peers by the reactor listening
// for vote events
// TODO: If rs.Height == vote.Height && rs.Round < vote.Round,
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().
return err
}
}
func commitMessageHandler(ctrl *Controller) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, envelope msgEnvelope) error {
msg := envelope.Msg.(*CommitMessage)
// attempt to add the commit and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
return ctrl.Dispatch(ctx, &TryAddCommitEvent{Commit: msg.Commit, PeerID: envelope.PeerID}, stateData)
}
}
func walMiddleware(wal WALWriteFlusher, logger log.Logger) msgMiddlewareFunc {
return func(hd msgHandlerFunc) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, envelope msgEnvelope) error {
mi := envelope.msgInfo
if !envelope.fromReplay {
if mi.PeerID != "" {
err := wal.Write(mi)
if err != nil {
logger.Error("failed writing to WAL", "error", err)
}
} else {
err := wal.WriteSync(mi) // NOTE: fsync
if err != nil {
panic(fmt.Errorf(
"failed to write %v msg to consensus WAL due to %w; check your file system and restart the node",
mi, err,
))
}
}
}
return hd(ctx, stateData, envelope)
}
}
}
func loggingMiddleware(logger log.Logger) msgMiddlewareFunc {
return func(hd msgHandlerFunc) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, envelope msgEnvelope) error {
args := append([]any{
"height", stateData.Height,
"round", stateData.Round,
"peer", envelope.PeerID,
"msg_type", fmt.Sprintf("%T", envelope.Msg),
}, makeLogArgsFromMessage(envelope.Msg)...)
loggerWithArgs := logger.With(args...)
ctx = log.CtxWithLogger(ctx, loggerWithArgs)
err := hd(ctx, stateData, envelope)
if err != nil {
loggerWithArgs.Error("failed to process message", "error", err)
return nil
}
loggerWithArgs.Trace("message processed successfully")
return nil
}
}
}
func msgInfoWithCtxMiddleware() msgMiddlewareFunc {
return func(hd msgHandlerFunc) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, envelope msgEnvelope) error {
ctx = msgInfoWithCtx(ctx, envelope.msgInfo)
return hd(ctx, stateData, envelope)
}
}
}
func logKeyValsWithError(keyVals []any, err error) []any {
if err == nil {
return keyVals
}
return append(keyVals, "error", err)
}
func makeLogArgsFromMessage(msg Message) []any {
switch m := msg.(type) {
case *ProposalMessage:
return []any{
"proposal_height", m.Proposal.Height,
"proposal_round", m.Proposal.Round,
"proposal_polRound", m.Proposal.POLRound,
}
case *BlockPartMessage:
return []any{
"block_height", m.Height,
"block_round", m.Round,
"part_index", m.Part.Index,
}
case *VoteMessage:
return []any{
"vote_type", m.Vote.Type.String(),
"vote_height", m.Vote.Height,
"vote_round", m.Vote.Round,
"val_proTxHash", m.Vote.ValidatorProTxHash.ShortString(),
"val_index", m.Vote.ValidatorIndex,
}
case *CommitMessage:
return []any{
"commit_height", m.Commit.Height,
"commit_round", m.Commit.Round,
}
}
panic("unsupported message type")
}