-
Notifications
You must be signed in to change notification settings - Fork 199
/
messageProcessor.go
58 lines (50 loc) · 2.01 KB
/
messageProcessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package resolvers
import (
"fmt"
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-core/marshal"
"github.com/ElrondNetwork/elrond-go/common"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
"github.com/ElrondNetwork/elrond-go/p2p"
)
// messageProcessor is used for basic message validity and parsing
type messageProcessor struct {
marshalizer marshal.Marshalizer
antifloodHandler dataRetriever.P2PAntifloodHandler
throttler dataRetriever.ResolverThrottler
topic string
}
func (mp *messageProcessor) canProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
if check.IfNil(message) {
return dataRetriever.ErrNilMessage
}
err := mp.antifloodHandler.CanProcessMessage(message, fromConnectedPeer)
if err != nil {
return fmt.Errorf("%w on resolver topic %s", err, mp.topic)
}
err = mp.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, mp.topic, 1, uint64(len(message.Data())), message.SeqNo())
if err != nil {
return fmt.Errorf("%w on resolver topic %s", err, mp.topic)
}
if !mp.throttler.CanProcess() {
return fmt.Errorf("%w on resolver topic %s", dataRetriever.ErrSystemBusy, mp.topic)
}
return nil
}
// parseReceivedMessage will transform the received p2p.Message in a RequestData object.
func (mp *messageProcessor) parseReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) (*dataRetriever.RequestData, error) {
rd := &dataRetriever.RequestData{}
err := rd.UnmarshalWith(mp.marshalizer, message)
if err != nil {
//this situation is so severe that we need to black list the peers
reason := "unmarshalable data got on request topic " + mp.topic
mp.antifloodHandler.BlacklistPeer(message.Peer(), reason, common.InvalidMessageBlacklistDuration)
mp.antifloodHandler.BlacklistPeer(fromConnectedPeer, reason, common.InvalidMessageBlacklistDuration)
return nil, err
}
if rd.Value == nil {
return nil, dataRetriever.ErrNilValue
}
return rd, nil
}