/
block_watcher.go
597 lines (546 loc) · 18.4 KB
/
block_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
package blockwatch
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
"github.com/0xProject/0x-mesh/meshdb"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
)
// maxBlocksInGetLogsQuery is the max number of blocks to fetch logs for in a single query. There is
// a hard limit of 10,000 logs returned by a single `eth_getLogs` query by Infura's Ethereum nodes so
// we need to try and stay below it. Parity, Geth and Alchemy all have much higher limits (if any) on
// the number of logs returned so Infura is by far the limiting factor.
var maxBlocksInGetLogsQuery = 60
// EventType describes the types of events emitted by blockwatch.Watcher. A block can be discovered
// and added to our representation of the chain. During a block re-org, a block previously stored
// can be removed from the list.
type EventType int
const (
Added EventType = iota
Removed
)
// Event describes a block event emitted by a Watcher
type Event struct {
Type EventType
BlockHeader *meshdb.MiniHeader
}
// Config holds some configuration options for an instance of BlockWatcher.
type Config struct {
MeshDB *meshdb.MeshDB
PollingInterval time.Duration
StartBlockDepth rpc.BlockNumber
BlockRetentionLimit int
WithLogs bool
Topics []common.Hash
Client Client
}
// Watcher maintains a consistent representation of the latest `blockRetentionLimit` blocks,
// handling block re-orgs and network disruptions gracefully. It can be started from any arbitrary
// block height, and will emit both block added and removed events.
type Watcher struct {
blockRetentionLimit int
startBlockDepth rpc.BlockNumber
stack *Stack
client Client
blockFeed event.Feed
blockScope event.SubscriptionScope // Subscription scope tracking current live listeners
wasStartedOnce bool // Whether the block watcher has previously been started
pollingInterval time.Duration
ticker *time.Ticker
withLogs bool
topics []common.Hash
mu sync.RWMutex
}
// New creates a new Watcher instance.
func New(config Config) *Watcher {
stack := NewStack(config.MeshDB, config.BlockRetentionLimit)
bs := &Watcher{
pollingInterval: config.PollingInterval,
blockRetentionLimit: config.BlockRetentionLimit,
startBlockDepth: config.StartBlockDepth,
stack: stack,
client: config.Client,
withLogs: config.WithLogs,
topics: config.Topics,
}
return bs
}
// BackfillEventsIfNeeded finds missed events that might have occured while the
// Mesh node was offline and sends them to event subscribers. It blocks until
// it is done backfilling or the given context is canceled.
func (w *Watcher) BackfillEventsIfNeeded(ctx context.Context) error {
events, err := w.getMissedEventsToBackfill(ctx)
if err != nil {
return err
}
if len(events) > 0 {
w.blockFeed.Send(events)
}
return nil
}
// Watch starts the Watcher. It will continuously look for new blocks and blocks
// until there is a critical error or the given context is canceled. Typically,
// you want to call Watch inside a goroutine. For non-critical errors, callers
// must receive them from the Errors channel.
func (w *Watcher) Watch(ctx context.Context) error {
w.mu.Lock()
if w.wasStartedOnce {
w.mu.Unlock()
return errors.New("Can only start Watcher once per instance")
}
w.wasStartedOnce = true
w.mu.Unlock()
ticker := time.NewTicker(w.pollingInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return nil
case <-ticker.C:
if err := w.pollNextBlock(); err != nil {
log.WithError(err).Error("blockwatch.Watcher error encountered")
}
}
}
}
// Subscribe allows one to subscribe to the block events emitted by the Watcher.
// To unsubscribe, simply call `Unsubscribe` on the returned subscription.
// The sink channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription {
return w.blockScope.Track(w.blockFeed.Subscribe(sink))
}
// GetLatestBlock returns the latest block processed
func (w *Watcher) GetLatestBlock() (*meshdb.MiniHeader, error) {
return w.stack.Peek()
}
// InspectRetainedBlocks returns the blocks retained in-memory by the Watcher instance. It is not
// particularly performant and therefore should only be used for debugging and testing purposes.
func (w *Watcher) InspectRetainedBlocks() ([]*meshdb.MiniHeader, error) {
return w.stack.Inspect()
}
// pollNextBlock polls for the next block header to be added to the block stack.
// If there are no blocks on the stack, it fetches the first block at the specified
// `startBlockDepth` supplied at instantiation.
func (w *Watcher) pollNextBlock() error {
var nextBlockNumber *big.Int
latestHeader, err := w.stack.Peek()
if err != nil {
return err
}
if latestHeader == nil {
if w.startBlockDepth == rpc.LatestBlockNumber {
nextBlockNumber = nil // Fetch latest block
} else {
nextBlockNumber = big.NewInt(int64(w.startBlockDepth))
}
} else {
nextBlockNumber = big.NewInt(0).Add(latestHeader.Number, big.NewInt(1))
}
nextHeader, err := w.client.HeaderByNumber(nextBlockNumber)
if err != nil {
if err == ethereum.NotFound {
log.WithFields(log.Fields{
"blockNumber": nextBlockNumber,
}).Trace("block header not found")
return nil // Noop and wait next polling interval
}
return err
}
events := []*Event{}
events, err = w.buildCanonicalChain(nextHeader, events)
// Even if an error occurred, we still want to emit the events gathered since we might have
// popped blocks off the Stack and they won't be re-added
if len(events) != 0 {
w.blockFeed.Send(events)
}
if err != nil {
return err
}
return nil
}
func (w *Watcher) buildCanonicalChain(nextHeader *meshdb.MiniHeader, events []*Event) ([]*Event, error) {
latestHeader, err := w.stack.Peek()
if err != nil {
return nil, err
}
// Is the stack empty or is it the next block?
if latestHeader == nil || nextHeader.Parent == latestHeader.Hash {
nextHeader, err := w.addLogs(nextHeader)
if err != nil {
// Due to block re-orgs & Ethereum node services load-balancing requests across multiple nodes
// a block header might be returned, but when fetching it's logs, an "unknown block" error is
// returned. This is expected to happen sometimes, and we simply return the events gathered so
// far and pick back up where we left off on the next polling interval.
if isUnknownBlockErr(err) {
log.WithFields(log.Fields{
"nextHeader": nextHeader,
}).Trace("failed to get logs for block")
return events, nil
}
return events, err
}
err = w.stack.Push(nextHeader)
if err != nil {
return events, err
}
events = append(events, &Event{
Type: Added,
BlockHeader: nextHeader,
})
return events, nil
}
// Pop latestHeader from the stack. We already have a reference to it.
if _, err := w.stack.Pop(); err != nil {
return events, err
}
events = append(events, &Event{
Type: Removed,
BlockHeader: latestHeader,
})
nextParentHeader, err := w.client.HeaderByHash(nextHeader.Parent)
if err != nil {
if err == ethereum.NotFound {
log.WithFields(log.Fields{
"blockNumber": nextHeader.Parent.Hex(),
}).Info("block header not found")
// Noop and wait next polling interval. We remove the popped blocks
// and refetch them on the next polling interval.
return events, nil
}
return events, err
}
events, err = w.buildCanonicalChain(nextParentHeader, events)
if err != nil {
return events, err
}
nextHeader, err = w.addLogs(nextHeader)
if err != nil {
// Due to block re-orgs & Ethereum node services load-balancing requests across multiple nodes
// a block header might be returned, but when fetching it's logs, an "unknown block" error is
// returned. This is expected to happen sometimes, and we simply return the events gathered so
// far and pick back up where we left off on the next polling interval.
if isUnknownBlockErr(err) {
log.WithFields(log.Fields{
"nextHeader": nextHeader,
}).Trace("failed to get logs for block")
return events, nil
}
return events, err
}
err = w.stack.Push(nextHeader)
if err != nil {
return events, err
}
events = append(events, &Event{
Type: Added,
BlockHeader: nextHeader,
})
return events, nil
}
func (w *Watcher) addLogs(header *meshdb.MiniHeader) (*meshdb.MiniHeader, error) {
if !w.withLogs {
return header, nil
}
logs, err := w.client.FilterLogs(ethereum.FilterQuery{
BlockHash: &header.Hash,
Topics: [][]common.Hash{w.topics},
})
if err != nil {
return header, err
}
header.Logs = logs
return header, nil
}
// getMissedEventsToBackfill finds missed events that might have occured while the Mesh node was
// offline. It does this by comparing the last block stored with the latest block discoverable via RPC.
// If the stored block is older then the latest block, it batch fetches the events for missing blocks,
// re-sets the stored blocks and returns the block events found.
func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, error) {
events := []*Event{}
latestRetainedBlock, err := w.stack.Peek()
if err != nil {
return events, err
}
// No blocks stored, nowhere to backfill to
if latestRetainedBlock == nil {
return events, nil
}
latestBlock, err := w.client.HeaderByNumber(nil)
if err != nil {
return events, err
}
blocksElapsed := big.NewInt(0).Sub(latestBlock.Number, latestRetainedBlock.Number)
if blocksElapsed.Int64() == 0 {
return events, nil
}
log.WithField("blocksElapsed", blocksElapsed.Int64()).Info("Some blocks have elapsed since last boot. Backfilling block events (this can take a while)...")
startBlockNum := int(latestRetainedBlock.Number.Int64() + 1)
endBlockNum := int(latestRetainedBlock.Number.Int64() + blocksElapsed.Int64())
logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, endBlockNum)
if int64(furthestBlockProcessed) > latestRetainedBlock.Number.Int64() {
// If we have processed blocks further then the latestRetainedBlock in the DB, we
// want to remove all blocks from the DB and insert the furthestBlockProcessed
// Doing so will cause the BlockWatcher to start from that furthestBlockProcessed.
headers, err := w.InspectRetainedBlocks()
if err != nil {
return events, err
}
for i := 0; i < len(headers); i++ {
_, err := w.stack.Pop()
if err != nil {
return events, err
}
}
// Add furthest block processed into the DB
latestHeader, err := w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed)))
if err != nil {
return events, err
}
err = w.stack.Push(latestHeader)
if err != nil {
return events, err
}
// If no logs found, noop
if len(logs) == 0 {
return events, nil
}
// Create the block events from all the logs found by grouping
// them into blockHeaders
hashToBlockHeader := map[common.Hash]*meshdb.MiniHeader{}
for _, log := range logs {
blockHeader, ok := hashToBlockHeader[log.BlockHash]
if !ok {
// TODO(fabio): Find a way to include the parent hash for the block as well.
// It's currently not an issue to omit it since we don't use the parent hash
// when processing block events in OrderWatcher.
blockHeader = &meshdb.MiniHeader{
Hash: log.BlockHash,
Number: big.NewInt(0).SetUint64(log.BlockNumber),
Logs: []types.Log{},
}
hashToBlockHeader[log.BlockHash] = blockHeader
}
blockHeader.Logs = append(blockHeader.Logs, log)
}
for _, blockHeader := range hashToBlockHeader {
events = append(events, &Event{
Type: Added,
BlockHeader: blockHeader,
})
}
log.Info("Done backfilling block events")
return events, nil
}
return events, nil
}
type logRequestResult struct {
From int
To int
Logs []types.Log
Err error
}
// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch
const getLogsRequestChunkSize = 3
// getLogsInBlockRange attempts to fetch all logs in the block range supplied. It implements a
// limited-concurrency batch fetch, where all requests in the previous batch must complete for
// the next batch of requests to be sent. If an error is encountered in a batch, all subsequent
// batch requests are not sent. Instead, it returns all the logs it found up until the error was
// encountered, along with the block number after which no further logs were retrieved.
func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]types.Log, int) {
blockRanges := w.getSubBlockRanges(from, to, maxBlocksInGetLogsQuery)
numChunks := 0
chunkChan := make(chan []*blockRange, 1000000)
for len(blockRanges) != 0 {
var chunk []*blockRange
if len(blockRanges) < getLogsRequestChunkSize {
chunk = blockRanges[:len(blockRanges)]
} else {
chunk = blockRanges[:getLogsRequestChunkSize]
}
chunkChan <- chunk
blockRanges = blockRanges[len(chunk):]
numChunks++
}
semaphoreChan := make(chan struct{}, 1)
defer close(semaphoreChan)
didAPreviousRequestFail := false
furthestBlockProcessed := from - 1
allLogs := []types.Log{}
for i := 0; i < numChunks; i++ {
// Add one to the semaphore chan. If it already has a value, the chunk blocks here until one frees up.
// We deliberately process the chunks sequentially, since if any request results in an error, we
// do not want to send any further requests.
semaphoreChan <- struct{}{}
// If a previous request failed, we stop processing newer requests
if didAPreviousRequestFail {
<-semaphoreChan
continue // Noop
}
mu := sync.Mutex{}
indexToLogResult := map[int]logRequestResult{}
chunk := <-chunkChan
wg := &sync.WaitGroup{}
for i, aBlockRange := range chunk {
wg.Add(1)
go func(index int, b *blockRange) {
defer wg.Done()
select {
case <-ctx.Done():
indexToLogResult[index] = logRequestResult{
From: b.FromBlock,
To: b.ToBlock,
Err: errors.New("context was canceled"),
Logs: []types.Log{},
}
return
default:
}
logs, err := w.filterLogsRecurisively(b.FromBlock, b.ToBlock, []types.Log{})
if err != nil {
log.WithFields(map[string]interface{}{
"error": err,
"fromBlock": b.FromBlock,
"toBlock": b.ToBlock,
}).Trace("Failed to fetch logs for range")
}
mu.Lock()
indexToLogResult[index] = logRequestResult{
From: b.FromBlock,
To: b.ToBlock,
Err: err,
Logs: logs,
}
mu.Unlock()
}(i, aBlockRange)
}
// Wait for all log requests to complete
wg.Wait()
for i, aBlockRange := range chunk {
logRequestResult := indexToLogResult[i]
// Break at first error encountered
if logRequestResult.Err != nil {
didAPreviousRequestFail = true
furthestBlockProcessed = logRequestResult.From - 1
break
}
allLogs = append(allLogs, logRequestResult.Logs...)
furthestBlockProcessed = aBlockRange.ToBlock
}
<-semaphoreChan
}
return allLogs, furthestBlockProcessed
}
type blockRange struct {
FromBlock int
ToBlock int
}
// getSubBlockRanges breaks up the block range into smaller block ranges of rangeSize.
// `eth_getLogs` requests are inclusive to both the start and end blocks specified and
// so we need to make the ranges exclusive of one another to avoid fetching the same
// blocks' logs twice.
func (w *Watcher) getSubBlockRanges(from, to, rangeSize int) []*blockRange {
chunks := []*blockRange{}
numBlocksLeft := to - from
if numBlocksLeft < rangeSize {
chunks = append(chunks, &blockRange{
FromBlock: from,
ToBlock: to,
})
} else {
blocks := []int{}
for i := 0; i <= numBlocksLeft; i++ {
blocks = append(blocks, from+i)
}
numChunks := len(blocks) / rangeSize
remainder := len(blocks) % rangeSize
if remainder > 0 {
numChunks = numChunks + 1
}
for i := 0; i < numChunks; i = i + 1 {
fromIndex := i * rangeSize
toIndex := fromIndex + rangeSize
if toIndex > len(blocks) {
toIndex = len(blocks)
}
bs := blocks[fromIndex:toIndex]
blockRange := &blockRange{
FromBlock: bs[0],
ToBlock: bs[len(bs)-1],
}
chunks = append(chunks, blockRange)
}
}
return chunks
}
const infuraTooManyResultsErrMsg = "query returned more than 10000 results"
func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []types.Log) ([]types.Log, error) {
log.WithFields(map[string]interface{}{
"from": from,
"to": to,
}).Trace("Fetching block logs")
numBlocks := to - from
topics := [][]common.Hash{}
if len(w.topics) > 0 {
topics = append(topics, w.topics)
}
logs, err := w.client.FilterLogs(ethereum.FilterQuery{
FromBlock: big.NewInt(int64(from)),
ToBlock: big.NewInt(int64(to)),
Topics: topics,
})
if err != nil {
// Infura caps the logs returned to 10,000 per request, if our request exceeds this limit, split it
// into two requests. Parity, Geth and Alchemy all have much higher limits (if any at all), so no need
// to expect any similar errors of this nature from them.
if err.Error() == infuraTooManyResultsErrMsg {
// HACK(fabio): Infura limits the returned results to 10,000 logs, BUT some single
// blocks contain more then 10,000 logs. This has supposedly been fixed but we keep
// this logic here just in case. It helps us avoid infinite recursion.
// Source: https://community.infura.io/t/getlogs-error-query-returned-more-than-1000-results/358/10
if from == to {
return allLogs, fmt.Errorf("Unable to get the logs for block #%d, because it contains too many logs", from)
}
r := numBlocks % 2
firstBatchSize := numBlocks / 2
secondBatchSize := firstBatchSize
if r == 1 {
secondBatchSize = secondBatchSize + 1
}
endFirstHalf := from + firstBatchSize
startSecondHalf := endFirstHalf + 1
allLogs, err := w.filterLogsRecurisively(from, endFirstHalf, allLogs)
if err != nil {
return nil, err
}
allLogs, err = w.filterLogsRecurisively(startSecondHalf, to, allLogs)
if err != nil {
return nil, err
}
return allLogs, nil
} else {
return nil, err
}
}
allLogs = append(allLogs, logs...)
return allLogs, nil
}
func isUnknownBlockErr(err error) bool {
// Geth error
if err.Error() == "unknown block" {
return true
}
// Parity error
if err.Error() == "One of the blocks specified in filter (fromBlock, toBlock or blockHash) cannot be found" {
return true
}
return false
}