/
registrations.go
287 lines (249 loc) · 10 KB
/
registrations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package log
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/GoPlugin/Plugin/core/internal/gethwrappers"
"github.com/GoPlugin/Plugin/core/internal/gethwrappers/generated"
"github.com/GoPlugin/Plugin/core/logger"
"github.com/GoPlugin/Plugin/core/store/models"
)
// How it works in general:
// 1. Each listener being registered can specify a custom NumConfirmations - number of block confirmations required for any log being sent to it.
// 2. Adding and removing listeners updates the highestNumConfirmations - a number tracking what's the current highest NumConfirmations globally
//
// 3. All received logs are kept in an array and deleted ONLY after they are outside the confirmation range for all subscribers
// (when given log height is lower than (latest height - max(highestNumConfirmations, ETH_FINALITY_DEPTH)) ) -> see: pool.go
//
// 4. The logs are attempted to be sent after every new head arrival:
// Each stored log is then checked against every matched listener and is sent unless:
// A) is too young for that listener
// B) the corresponding block height is known to be already processed for that listener
// In the normal case, each log will be only processed once, and then its corresponding head will be remembered, so it's not double-sent
//
// After processing the whole batch of logs considered for sending, the per-listener metadata is updated in applyListenerInfoUpdates.
// If a re-org happens, the stored lowestAllowedBlockNumber (per-listener) is re-set,
// so the logs from that chain are then considered unprocessed, and will be sent again.
//
type (
registrations struct {
registrations map[common.Address]map[common.Hash]map[Listener]*listenerMetadata // contractAddress => logTopic => Listener
decoders map[common.Address]ParseLogFunc
// highest 'NumConfirmations' per all listeners, used to decide about deleting older logs if it's higher than EthFinalityDepth
// it's: max(listeners.map(l => l.num_confirmations)
highestNumConfirmations uint64
}
// The Listener responds to log events through HandleLog.
Listener interface {
HandleLog(b Broadcast)
JobID() models.JobID
JobIDV2() int32
IsV2Job() bool
}
// metadata structure maintained per listener, used to avoid double-sends of logs
listenerMetadata struct {
opts ListenerOpts
filters [][]Topic
lowestAllowedBlockNumber uint64
lastSeenChain *models.Head
}
// an update to listener metadata structure
listenerMetadataUpdate struct {
toUpdate *listenerMetadata
newLowestAllowedBlockNumber uint64
}
)
func newRegistrations() *registrations {
return ®istrations{
registrations: make(map[common.Address]map[common.Hash]map[Listener]*listenerMetadata),
decoders: make(map[common.Address]ParseLogFunc),
}
}
func (r *registrations) addSubscriber(reg registration) (needsResubscribe bool) {
addr := reg.opts.Contract
r.decoders[addr] = reg.opts.ParseLog
if reg.opts.NumConfirmations <= 0 {
reg.opts.NumConfirmations = 1
}
if _, exists := r.registrations[addr]; !exists {
r.registrations[addr] = make(map[common.Hash]map[Listener]*listenerMetadata)
}
for topic, topicValueFilters := range reg.opts.LogsWithTopics {
if _, exists := r.registrations[addr][topic]; !exists {
r.registrations[addr][topic] = make(map[Listener]*listenerMetadata)
needsResubscribe = true
}
r.registrations[addr][topic][reg.listener] = &listenerMetadata{
opts: reg.opts,
filters: topicValueFilters,
lowestAllowedBlockNumber: uint64(0),
}
}
r.maybeIncreaseHighestNumConfirmations(reg.opts.NumConfirmations)
return
}
func (r *registrations) removeSubscriber(reg registration) (needsResubscribe bool) {
addr := reg.opts.Contract
if _, exists := r.registrations[addr]; !exists {
return
}
for topic := range reg.opts.LogsWithTopics {
if _, exists := r.registrations[addr][topic]; !exists {
continue
}
delete(r.registrations[addr][topic], reg.listener)
if len(r.registrations[addr][topic]) == 0 {
needsResubscribe = true
delete(r.registrations[addr], topic)
}
if len(r.registrations[addr]) == 0 {
delete(r.registrations, addr)
}
}
r.resetHighestNumConfirmations()
return
}
// increase the highestNumConfirmations stored if the new listener has a higher value
func (r *registrations) maybeIncreaseHighestNumConfirmations(newNumConfirmations uint64) {
if newNumConfirmations > r.highestNumConfirmations {
r.highestNumConfirmations = newNumConfirmations
}
}
// reset the highest confirmation number per all current listeners
func (r *registrations) resetHighestNumConfirmations() {
highestNumConfirmations := uint64(0)
for _, perAddress := range r.registrations {
for _, perTopic := range perAddress {
for _, listener := range perTopic {
if listener.opts.NumConfirmations > highestNumConfirmations {
highestNumConfirmations = listener.opts.NumConfirmations
}
}
}
}
r.highestNumConfirmations = highestNumConfirmations
}
func (r *registrations) addressesAndTopics() ([]common.Address, []common.Hash) {
var addresses []common.Address
var topics []common.Hash
for addr := range r.registrations {
addresses = append(addresses, addr)
for topic := range r.registrations[addr] {
topics = append(topics, topic)
}
}
return addresses, topics
}
func (r *registrations) isAddressRegistered(address common.Address) bool {
_, exists := r.registrations[address]
return exists
}
func (r *registrations) sendLogs(logs []types.Log, orm ORM, latestHead models.Head) {
updates := make([]listenerMetadataUpdate, 0)
for _, log := range logs {
r.sendLog(log, orm, latestHead, &updates)
}
applyListenerInfoUpdates(updates, latestHead)
}
// Returns true if there is at least one filter value (or no filters) that matches an actual received value for every index i, or false otherwise
func filtersContainValues(topicValues []common.Hash, filters [][]Topic) bool {
for i := 0; i < len(topicValues) && i < len(filters); i++ {
filterValues := filters[i]
valueFound := len(filterValues) == 0 // empty filter for given index means: all values allowed
for _, filterValue := range filterValues {
if common.Hash(filterValue) == topicValues[i] {
valueFound = true
break
}
}
if !valueFound {
return false
}
}
return true
}
func (r *registrations) sendLog(log types.Log, orm ORM, latestHead models.Head, updates *[]listenerMetadataUpdate) {
latestBlockNumber := uint64(latestHead.Number)
var wg sync.WaitGroup
for listener, metadata := range r.registrations[log.Address][log.Topics[0]] {
listener := listener
numConfirmations := metadata.opts.NumConfirmations
if latestBlockNumber < numConfirmations {
// Skipping send because not enough height to send
continue
}
// We attempt the send multiple times per log (depending on distinct num of confirmations of listeners),
// even if the logs are too young
// so here we need to see if this particular listener actually should receive it at this depth
isOldEnough := (log.BlockNumber + numConfirmations - 1) <= latestBlockNumber
if !isOldEnough {
continue
}
// All logs for blocks below lowestAllowedBlockNumber were already sent to this listener, so we skip them
if log.BlockNumber < metadata.lowestAllowedBlockNumber && metadata.lastSeenChain != nil && metadata.lastSeenChain.IsInChain(log.BlockHash) {
// Skipping send because the log height is below lowest unprocessed in the currently remembered chain
continue
}
if len(metadata.filters) > 0 && len(log.Topics) > 1 {
topicValues := log.Topics[1:]
if !filtersContainValues(topicValues, metadata.filters) {
continue
}
}
// Make sure that this log is not sent again on the next head by increasing the newLowestAllowedBlockNumber
*updates = append(*updates, listenerMetadataUpdate{
toUpdate: metadata,
newLowestAllowedBlockNumber: log.BlockNumber + 1,
})
logCopy := gethwrappers.CopyLog(log)
var decodedLog generated.AbigenLog
var err error
if parseLog := r.decoders[log.Address]; parseLog != nil {
decodedLog, err = parseLog(logCopy)
if err != nil {
logger.Errorw("Could not parse contract log", "error", err)
continue
}
}
wg.Add(1)
go func() {
defer wg.Done()
listener.HandleLog(&broadcast{
latestBlockNumber: latestBlockNumber,
latestBlockHash: latestHead.Hash,
rawLog: logCopy,
decodedLog: decodedLog,
jobID: listener.JobID(),
jobIDV2: listener.JobIDV2(),
isV2: listener.IsV2Job(),
})
}()
}
wg.Wait()
}
// After processing the logs in this batch, the listenerMetadata structures that we touched, are updated
// with new information about the canonical chain and the lowestAllowedBlockNumber value (higher every time) that is used to guard against double-sends
// Note that the updates are applied only after all the logs for the (latest height - num_confirmations) head height were sent.
func applyListenerInfoUpdates(updates []listenerMetadataUpdate, latestHead models.Head) {
for _, update := range updates {
if update.toUpdate.lastSeenChain == nil || latestHead.IsInChain(update.toUpdate.lastSeenChain.Hash) {
if update.toUpdate.lowestAllowedBlockNumber < update.newLowestAllowedBlockNumber {
update.toUpdate.lowestAllowedBlockNumber = update.newLowestAllowedBlockNumber
}
} else {
logger.Debugw("LogBroadcaster: Chain reorg - resetting lowestAllowedBlockNumber",
"blockNumber", latestHead.Number,
"blockHash", latestHead.Hash,
"lastSeenChainNumber", update.toUpdate.lastSeenChain.Number,
"lastSeenChainHash", update.toUpdate.lastSeenChain.Hash,
"chainLength", fmt.Sprintf("%v", latestHead.ChainLength()),
"chainHashes", fmt.Sprintf("%v", latestHead.ChainHashes()),
)
// Re-org situation: the chain was changed, so we can't use the number that tracked last unprocessed height of the previous chain
update.toUpdate.lowestAllowedBlockNumber = 0
}
// Setting as latest head for this listener
update.toUpdate.lastSeenChain = &latestHead
}
}