-
Notifications
You must be signed in to change notification settings - Fork 199
/
miniblockInterceptorProcessor.go
123 lines (105 loc) · 4.15 KB
/
miniblockInterceptorProcessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package processor
import (
"sync"
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-core/data/block"
"github.com/ElrondNetwork/elrond-go-core/hashing"
"github.com/ElrondNetwork/elrond-go-core/marshal"
logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/block/interceptedBlocks"
"github.com/ElrondNetwork/elrond-go/sharding"
"github.com/ElrondNetwork/elrond-go/storage"
)
var _ process.InterceptorProcessor = (*MiniblockInterceptorProcessor)(nil)
var log = logger.GetOrCreate("process/interceptors/processor")
// MiniblockInterceptorProcessor is the processor used when intercepting miniblocks
type MiniblockInterceptorProcessor struct {
miniblockCache storage.Cacher
marshalizer marshal.Marshalizer
hasher hashing.Hasher
shardCoordinator sharding.Coordinator
whiteListHandler process.WhiteListHandler
registeredHandlers []func(topic string, hash []byte, data interface{})
mutHandlers sync.RWMutex
}
// NewMiniblockInterceptorProcessor creates a new MiniblockInterceptorProcessor instance
func NewMiniblockInterceptorProcessor(argument *ArgMiniblockInterceptorProcessor) (*MiniblockInterceptorProcessor, error) {
if argument == nil {
return nil, process.ErrNilArgumentStruct
}
if check.IfNil(argument.MiniblockCache) {
return nil, process.ErrNilMiniBlockPool
}
if check.IfNil(argument.Marshalizer) {
return nil, process.ErrNilMarshalizer
}
if check.IfNil(argument.Hasher) {
return nil, process.ErrNilHasher
}
if check.IfNil(argument.ShardCoordinator) {
return nil, process.ErrNilShardCoordinator
}
if check.IfNil(argument.WhiteListHandler) {
return nil, process.ErrNilWhiteListHandler
}
return &MiniblockInterceptorProcessor{
miniblockCache: argument.MiniblockCache,
marshalizer: argument.Marshalizer,
hasher: argument.Hasher,
shardCoordinator: argument.ShardCoordinator,
whiteListHandler: argument.WhiteListHandler,
registeredHandlers: make([]func(topic string, hash []byte, data interface{}), 0),
}, nil
}
// Validate checks if the intercepted data can be processed
// It returns nil as a body might consist of multiple miniblocks
// Since some might be valid and others not, we rather do the checking when
// we iterate the slice for processing as it is optimal to do so
func (mip *MiniblockInterceptorProcessor) Validate(_ process.InterceptedData, _ core.PeerID) error {
return nil
}
// Save will save the received miniblocks inside the miniblock cacher after a new validation round
// that will be done on each miniblock
func (mip *MiniblockInterceptorProcessor) Save(data process.InterceptedData, _ core.PeerID, topic string) error {
interceptedMiniblock, ok := data.(*interceptedBlocks.InterceptedMiniblock)
if !ok {
return process.ErrWrongTypeAssertion
}
miniblock := interceptedMiniblock.Miniblock()
hash := interceptedMiniblock.Hash()
go mip.notify(miniblock, hash, topic)
if !mip.whiteListHandler.IsWhiteListed(data) {
log.Trace(
"MiniblockInterceptorProcessor.Save: not whitelisted miniblocks will not be added in pool",
"type", miniblock.Type,
"sender shard", miniblock.SenderShardID,
"receiver shard", miniblock.ReceiverShardID,
"hash", hash,
)
return nil
}
mip.miniblockCache.HasOrAdd(hash, miniblock, miniblock.Size())
return nil
}
// RegisterHandler registers a callback function to be notified of incoming miniBlocks
func (mip *MiniblockInterceptorProcessor) RegisterHandler(handler func(topic string, hash []byte, data interface{})) {
if handler == nil {
return
}
mip.mutHandlers.Lock()
mip.registeredHandlers = append(mip.registeredHandlers, handler)
mip.mutHandlers.Unlock()
}
// IsInterfaceNil returns true if there is no value under the interface
func (mip *MiniblockInterceptorProcessor) IsInterfaceNil() bool {
return mip == nil
}
func (mip *MiniblockInterceptorProcessor) notify(miniBlock *block.MiniBlock, hash []byte, topic string) {
mip.mutHandlers.RLock()
for _, handler := range mip.registeredHandlers {
handler(topic, hash, miniBlock)
}
mip.mutHandlers.RUnlock()
}