/
p2p_msg_handler.go
80 lines (75 loc) · 2.11 KB
/
p2p_msg_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package mempool
import (
"context"
"errors"
"fmt"
"github.com/dashpay/tenderdash/internal/p2p"
"github.com/dashpay/tenderdash/internal/p2p/client"
"github.com/dashpay/tenderdash/libs/log"
protomem "github.com/dashpay/tenderdash/proto/tendermint/mempool"
"github.com/dashpay/tenderdash/types"
)
type (
mempoolP2PMessageHandler struct {
logger log.Logger
checker TxChecker
ids *IDs
}
)
func consumerHandler(logger log.Logger, checker TxChecker, ids *IDs) client.ConsumerParams {
chanIDs := []p2p.ChannelID{p2p.MempoolChannel}
return client.ConsumerParams{
ReadChannels: chanIDs,
Handler: client.HandlerWithMiddlewares(
&mempoolP2PMessageHandler{
logger: logger,
checker: checker,
ids: ids,
},
client.WithValidateMessageHandler(chanIDs),
client.WithErrorLoggerMiddleware(logger),
client.WithRecoveryMiddleware(logger),
),
}
}
// Handle handles a message from a block-sync message set
func (h *mempoolP2PMessageHandler) Handle(ctx context.Context, _ *client.Client, envelope *p2p.Envelope) error {
logger := h.logger.With("peer", envelope.From)
msg, ok := envelope.Message.(*protomem.Txs)
if !ok {
return fmt.Errorf("received unknown message: %T", msg)
}
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
return errors.New("empty txs received from peer")
}
txInfo := TxInfo{
SenderID: h.ids.GetForPeer(envelope.From),
SenderNodeID: envelope.From,
}
for _, tx := range protoTxs {
if err := h.checker.CheckTx(ctx, tx, nil, txInfo); err != nil {
if errors.Is(err, types.ErrTxInCache) {
// if the tx is in the cache,
// then we've been gossiped a
// Tx that we've already
// got. Gossip should be
// smarter, but it's not a
// problem.
continue
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Do not propagate context
// cancellation errors, but do
// not continue to check
// transactions from this
// message if we are shutting down.
return err
}
logger.Error("checktx failed for tx",
"tx", fmt.Sprintf("%X", types.Tx(tx).Hash()),
"error", err)
}
}
return nil
}