-
Notifications
You must be signed in to change notification settings - Fork 199
/
multiDataInterceptor.go
130 lines (111 loc) · 3.34 KB
/
multiDataInterceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package interceptors
import (
"sync"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/logger"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/process"
)
var log = logger.GetOrCreate("process/interceptors")
// MultiDataInterceptor is used for intercepting packed multi data
type MultiDataInterceptor struct {
marshalizer marshal.Marshalizer
factory process.InterceptedDataFactory
processor process.InterceptorProcessor
throttler process.InterceptorThrottler
}
// NewMultiDataInterceptor hooks a new interceptor for packed multi data
func NewMultiDataInterceptor(
marshalizer marshal.Marshalizer,
factory process.InterceptedDataFactory,
processor process.InterceptorProcessor,
throttler process.InterceptorThrottler,
) (*MultiDataInterceptor, error) {
if check.IfNil(marshalizer) {
return nil, process.ErrNilMarshalizer
}
if check.IfNil(factory) {
return nil, process.ErrNilInterceptedDataFactory
}
if check.IfNil(processor) {
return nil, process.ErrNilInterceptedDataProcessor
}
if check.IfNil(throttler) {
return nil, process.ErrNilInterceptorThrottler
}
multiDataIntercept := &MultiDataInterceptor{
marshalizer: marshalizer,
factory: factory,
processor: processor,
throttler: throttler,
}
return multiDataIntercept, nil
}
// ProcessReceivedMessage is the callback func from the p2p.Messenger and will be called each time a new message was received
// (for the topic this validator was registered to)
func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P, broadcastHandler func(buffToSend []byte)) error {
err := preProcessMesage(mdi.throttler, message)
if err != nil {
return err
}
multiDataBuff := make([][]byte, 0)
err = mdi.marshalizer.Unmarshal(&multiDataBuff, message.Data())
if err != nil {
mdi.throttler.EndProcessing()
return err
}
if len(multiDataBuff) == 0 {
mdi.throttler.EndProcessing()
return process.ErrNoDataInMessage
}
filteredMultiDataBuff := make([][]byte, 0)
lastErrEncountered := error(nil)
wgProcess := &sync.WaitGroup{}
wgProcess.Add(len(multiDataBuff))
go func() {
wgProcess.Wait()
mdi.throttler.EndProcessing()
}()
for _, dataBuff := range multiDataBuff {
interceptedData, err := mdi.factory.Create(dataBuff)
if err != nil {
lastErrEncountered = err
wgProcess.Done()
continue
}
err = interceptedData.CheckValidity()
if err != nil {
lastErrEncountered = err
wgProcess.Done()
continue
}
//data is validated, add it to filtered out buff
filteredMultiDataBuff = append(filteredMultiDataBuff, dataBuff)
if !interceptedData.IsForCurrentShard() {
log.Trace("intercepted data is for other shards")
wgProcess.Done()
continue
}
go processInterceptedData(mdi.processor, interceptedData, wgProcess)
}
var buffToSend []byte
haveDataForBroadcast := len(filteredMultiDataBuff) > 0 && lastErrEncountered != nil
if haveDataForBroadcast {
buffToSend, err = mdi.marshalizer.Marshal(filteredMultiDataBuff)
if err != nil {
return err
}
if broadcastHandler != nil {
broadcastHandler(buffToSend)
}
}
return lastErrEncountered
}
// IsInterfaceNil returns true if there is no value under the interface
func (mdi *MultiDataInterceptor) IsInterfaceNil() bool {
if mdi == nil {
return true
}
return false
}