forked from 0xProject/0x-mesh
/
block_watcher.go
671 lines (609 loc) · 20.6 KB
/
block_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
package blockwatch
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/ethereum/miniheader"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
log "github.com/sirupsen/logrus"
"github.com/syndtr/goleveldb/leveldb"
)
// maxBlocksInGetLogsQuery is the max number of blocks to fetch logs for in a single query. There is
// a hard limit of 10,000 logs returned by a single `eth_getLogs` query by Infura's Ethereum nodes so
// we need to try and stay below it. Parity, Geth and Alchemy all have much higher limits (if any) on
// the number of logs returned so Infura is by far the limiting factor.
var maxBlocksInGetLogsQuery = 60
// EventType describes the types of events emitted by blockwatch.Watcher. A block can be discovered
// and added to our representation of the chain. During a block re-org, a block previously stored
// can be removed from the list.
type EventType int
const (
Added EventType = iota
Removed
)
// Event describes a block event emitted by a Watcher
type Event struct {
Type EventType
BlockHeader *miniheader.MiniHeader
}
// Stack defines the interface a stack must implement in order to be used by
// OrderWatcher for block header storage
type Stack interface {
Pop() (*miniheader.MiniHeader, error)
Push(*miniheader.MiniHeader) error
Peek() (*miniheader.MiniHeader, error)
PeekAll() ([]*miniheader.MiniHeader, error)
Clear() error
Checkpoint() (int, error)
Reset(int) error
}
// TooMayBlocksBehindError is an error returned if the BlockWatcher has fallen too many blocks behind
// the latest block (>128 blocks), and cannot catch back up when connect to a non-archive Ethereum
// node.
type TooMayBlocksBehindError struct {
blocksMissing int
}
func (e TooMayBlocksBehindError) Error() string {
return fmt.Sprintf("too many blocks (%d) behind the latest block", e.blocksMissing)
}
// Config holds some configuration options for an instance of BlockWatcher.
type Config struct {
Stack Stack
PollingInterval time.Duration
WithLogs bool
Topics []common.Hash
Client Client
}
// Watcher maintains a consistent representation of the latest X blocks (where X is enforced by the
// supplied stack) handling block re-orgs and network disruptions gracefully. It can be started from
// any arbitrary block height, and will emit both block added and removed events.
type Watcher struct {
stack Stack
client Client
blockFeed event.Feed
blockScope event.SubscriptionScope // Subscription scope tracking current live listeners
wasStartedOnce bool // Whether the block watcher has previously been started
pollingInterval time.Duration
withLogs bool
topics []common.Hash
mu sync.RWMutex
syncToLatestBlockMu sync.Mutex
}
// New creates a new Watcher instance.
func New(config Config) *Watcher {
return &Watcher{
pollingInterval: config.PollingInterval,
stack: config.Stack,
client: config.Client,
withLogs: config.WithLogs,
topics: config.Topics,
}
}
// FastSyncToLatestBlock checks if the BlockWatcher is behind the latest block, and if so,
// catches it back up. If less than 128 blocks passed, we are able to fetch all missing
// block events and process them. If more than 128 blocks passed, we cannot catch up
// without an archive Ethereum node (see: http://bit.ly/2D11Hr6) so we instead clear
// previously tracked blocks so BlockWatcher starts again from the latest block. This
// function blocks until complete or the context is cancelled.
func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int, err error) {
w.mu.Lock()
if w.wasStartedOnce {
w.mu.Unlock()
return 0, errors.New("Can only fast-sync to latest block before starting BlockWatcher")
}
w.mu.Unlock()
latestBlockProcessed, err := w.stack.Peek()
if err != nil {
return 0, err
}
// No previously stored block so no blocks have elapsed
if latestBlockProcessed == nil {
return 0, nil
}
latestBlock, err := w.client.HeaderByNumber(nil)
if err != nil {
return 0, err
}
latestBlockProcessedNumber := int(latestBlockProcessed.Number.Int64())
blocksElapsed = int(latestBlock.Number.Int64()) - latestBlockProcessedNumber
if blocksElapsed == 0 {
return blocksElapsed, nil
} else if blocksElapsed < constants.MaxBlocksStoredInNonArchiveNode {
log.WithField("blocksElapsed", blocksElapsed).Info("Some blocks have elapsed since last boot. Backfilling block events (this can take a while)...")
events, err := w.getMissedEventsToBackfill(ctx, blocksElapsed, latestBlockProcessedNumber)
if err != nil {
return blocksElapsed, err
}
if len(events) > 0 {
w.blockFeed.Send(events)
}
} else {
// Clear all block headers from stack so BlockWatcher starts again from latest block
if err := w.stack.Clear(); err != nil {
return blocksElapsed, err
}
}
return blocksElapsed, nil
}
// Watch starts the Watcher. It will continuously look for new blocks and blocks
// until there is a critical error or the given context is canceled. Typically,
// you want to call Watch inside a goroutine. For non-critical errors, callers
// must receive them from the Errors channel.
func (w *Watcher) Watch(ctx context.Context) error {
w.mu.Lock()
if w.wasStartedOnce {
w.mu.Unlock()
return errors.New("Can only start Watcher once per instance")
}
w.wasStartedOnce = true
w.mu.Unlock()
// Sync immediately when `Watch()` is called instead of waiting for the
// first Ticker tick
if err := w.SyncToLatestBlock(); err != nil {
if err == leveldb.ErrClosed {
// We can't continue if the database is closed. Stop the watcher and
// return an error.
return err
}
log.WithError(err).Error("blockwatch.Watcher error encountered")
}
ticker := time.NewTicker(w.pollingInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return nil
case <-ticker.C:
if err := w.SyncToLatestBlock(); err != nil {
if err == leveldb.ErrClosed {
// We can't continue if the database is closed. Stop the watcher and
// return an error.
ticker.Stop()
return err
}
if _, ok := err.(TooMayBlocksBehindError); ok {
// We've fallen too many blocks behind to sync to the latest block.
// We'd need to start again from the latest block but also require
// the OrderWatcher to re-validate all orders at the latest block.
// By returning an error here, we cause Mesh to gracefully shut down.
// Upon re-booting, it will reset the blocks stored in the DB and
// re-validate all orders stored.
ticker.Stop()
return err
}
log.WithError(err).Error("blockwatch.Watcher error encountered")
}
}
}
}
// Subscribe allows one to subscribe to the block events emitted by the Watcher.
// To unsubscribe, simply call `Unsubscribe` on the returned subscription.
// The sink channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription {
return w.blockScope.Track(w.blockFeed.Subscribe(sink))
}
// SyncToLatestBlock syncs our local state of the chain to the latest block found via
// Ethereum RPC
func (w *Watcher) SyncToLatestBlock() error {
w.syncToLatestBlockMu.Lock()
defer w.syncToLatestBlockMu.Unlock()
checkpointID, err := w.stack.Checkpoint()
if err != nil {
return err
}
latestHeader, err := w.client.HeaderByNumber(nil)
if err != nil {
return err
}
latestBlockNumber := latestHeader.Number.Int64()
lastStoredHeader, err := w.stack.Peek()
if err != nil {
return err
}
var lastStoredBlockNumber int64
if lastStoredHeader != nil {
lastStoredBlockNumber = lastStoredHeader.Number.Int64()
}
var numBlocksToFetch int
// No blocks stored yet, fetch the first
if lastStoredHeader == nil {
numBlocksToFetch = 1
} else {
// Noop if already caught up or ahead of latest block returned from Ethereum node
if latestBlockNumber <= lastStoredBlockNumber {
return nil
}
numBlocksToFetch = int(latestBlockNumber - lastStoredBlockNumber)
}
if numBlocksToFetch >= constants.MaxBlocksStoredInNonArchiveNode {
return TooMayBlocksBehindError{
blocksMissing: numBlocksToFetch,
}
}
allEvents := []*Event{}
// Syncing to the latest block involves multiple Ethereum RPC requests. If any of them fail, we
// stop syncing and set the encountered error to `syncErr` to be returned to the caller after we've
// either reset or persisted the changes gathered up until the point where the error occurred.
var syncErr error
for i := 0; i < numBlocksToFetch; i++ {
// Optimization: If numBlocksToFetch is 1, we already know what the nextHeader is, so avoid
// fetching it again. If there is more then 1 block to fetch, compute each from the last
// stored and fetch it
nextHeader := latestHeader
if numBlocksToFetch != 1 {
lastStoredHeader, err := w.stack.Peek()
if err != nil {
syncErr = err
break
}
nextBlockNumber := big.NewInt(0).Add(lastStoredHeader.Number, big.NewInt(1))
nextHeader, err = w.client.HeaderByNumber(nextBlockNumber)
if err != nil {
syncErr = err
break
}
}
var events []*Event
events, err = w.buildCanonicalChain(nextHeader, events)
allEvents = append(allEvents, events...)
if err != nil {
syncErr = err
break
}
}
if len(allEvents) == 0 {
return syncErr
}
if w.shouldRevertChanges(lastStoredHeader, allEvents) {
if err := w.stack.Reset(checkpointID); err != nil {
return err
}
} else {
_, err = w.stack.Checkpoint()
if err != nil {
return err
}
w.blockFeed.Send(allEvents)
}
return syncErr
}
func (w *Watcher) shouldRevertChanges(lastStoredHeader *miniheader.MiniHeader, events []*Event) bool {
if len(events) == 0 || lastStoredHeader == nil {
return false
}
// If we haven't progressed in terms of block number, revert back to previous "latest" block.
// This ensures block events always leave the node further ahead, preventing unnecessary thrash
// during block-reorgs (which tend to cluster)
newLatestHeader := events[len(events)-1].BlockHeader
return newLatestHeader.Number.Cmp(lastStoredHeader.Number) <= 0
}
func (w *Watcher) buildCanonicalChain(nextHeader *miniheader.MiniHeader, events []*Event) ([]*Event, error) {
latestHeader, err := w.stack.Peek()
if err != nil {
return nil, err
}
// Is the stack empty or is it the next block?
if latestHeader == nil || nextHeader.Parent == latestHeader.Hash {
nextHeader, err := w.addLogs(nextHeader)
if err != nil {
return events, err
}
err = w.stack.Push(nextHeader)
if err != nil {
return events, err
}
events = append(events, &Event{
Type: Added,
BlockHeader: nextHeader,
})
return events, nil
}
// Pop latestHeader from the stack. We already have a reference to it.
if _, err := w.stack.Pop(); err != nil {
return events, err
}
events = append(events, &Event{
Type: Removed,
BlockHeader: latestHeader,
})
nextParentHeader, err := w.client.HeaderByHash(nextHeader.Parent)
if err != nil {
return events, err
}
events, err = w.buildCanonicalChain(nextParentHeader, events)
if err != nil {
return events, err
}
nextHeader, err = w.addLogs(nextHeader)
if err != nil {
return events, err
}
err = w.stack.Push(nextHeader)
if err != nil {
return events, err
}
events = append(events, &Event{
Type: Added,
BlockHeader: nextHeader,
})
return events, nil
}
func (w *Watcher) addLogs(header *miniheader.MiniHeader) (*miniheader.MiniHeader, error) {
if !w.withLogs {
return header, nil
}
logs, err := w.client.FilterLogs(ethereum.FilterQuery{
BlockHash: &header.Hash,
Topics: [][]common.Hash{w.topics},
})
if err != nil {
return header, err
}
header.Logs = logs
return header, nil
}
// getMissedEventsToBackfill finds missed events that might have occured while the Mesh node was
// offline. It does this by comparing the last block stored with the latest block discoverable via RPC.
// If the stored block is older then the latest block, it batch fetches the events for missing blocks,
// re-sets the stored blocks and returns the block events found.
func (w *Watcher) getMissedEventsToBackfill(ctx context.Context, blocksElapsed int, latestRetainedBlockNumber int) ([]*Event, error) {
events := []*Event{}
startBlockNum := latestRetainedBlockNumber + 1
endBlockNum := latestRetainedBlockNumber + blocksElapsed
logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, endBlockNum)
if furthestBlockProcessed > latestRetainedBlockNumber {
// If we have processed blocks further then the latestRetainedBlock in the DB, we
// want to remove all blocks from the DB and insert the furthestBlockProcessed
// Doing so will cause the BlockWatcher to start from that furthestBlockProcessed.
if err := w.stack.Clear(); err != nil {
return events, err
}
// Add furthest block processed into the DB
latestHeader, err := w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed)))
if err != nil {
return events, err
}
err = w.stack.Push(latestHeader)
if err != nil {
return events, err
}
// If no logs found, noop
if len(logs) == 0 {
return events, nil
}
// Create the block events from all the logs found by grouping
// them into blockHeaders
hashToBlockHeader := map[common.Hash]*miniheader.MiniHeader{}
for _, log := range logs {
blockHeader, ok := hashToBlockHeader[log.BlockHash]
if !ok {
blockNumber := big.NewInt(0).SetUint64(log.BlockNumber)
header, err := w.client.HeaderByNumber(blockNumber)
if err != nil {
return events, err
}
blockHeader = &miniheader.MiniHeader{
Hash: log.BlockHash,
Parent: header.Parent,
Number: blockNumber,
Logs: []types.Log{},
Timestamp: header.Timestamp,
}
hashToBlockHeader[log.BlockHash] = blockHeader
}
blockHeader.Logs = append(blockHeader.Logs, log)
}
for _, blockHeader := range hashToBlockHeader {
events = append(events, &Event{
Type: Added,
BlockHeader: blockHeader,
})
}
log.Info("Done backfilling block events")
return events, nil
}
return events, nil
}
type logRequestResult struct {
From int
To int
Logs []types.Log
Err error
}
// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch
const getLogsRequestChunkSize = 3
// getLogsInBlockRange attempts to fetch all logs in the block range supplied. It implements a
// limited-concurrency batch fetch, where all requests in the previous batch must complete for
// the next batch of requests to be sent. If an error is encountered in a batch, all subsequent
// batch requests are not sent. Instead, it returns all the logs it found up until the error was
// encountered, along with the block number after which no further logs were retrieved.
func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]types.Log, int) {
blockRanges := w.getSubBlockRanges(from, to, maxBlocksInGetLogsQuery)
numChunks := 0
chunkChan := make(chan []*blockRange, 1000000)
for len(blockRanges) != 0 {
var chunk []*blockRange
if len(blockRanges) < getLogsRequestChunkSize {
chunk = blockRanges[:len(blockRanges)]
} else {
chunk = blockRanges[:getLogsRequestChunkSize]
}
chunkChan <- chunk
blockRanges = blockRanges[len(chunk):]
numChunks++
}
semaphoreChan := make(chan struct{}, 1)
defer close(semaphoreChan)
didAPreviousRequestFail := false
furthestBlockProcessed := from - 1
allLogs := []types.Log{}
for i := 0; i < numChunks; i++ {
// Add one to the semaphore chan. If it already has a value, the chunk blocks here until one frees up.
// We deliberately process the chunks sequentially, since if any request results in an error, we
// do not want to send any further requests.
semaphoreChan <- struct{}{}
// If a previous request failed, we stop processing newer requests
if didAPreviousRequestFail {
<-semaphoreChan
continue // Noop
}
mu := sync.Mutex{}
indexToLogResult := map[int]logRequestResult{}
chunk := <-chunkChan
wg := &sync.WaitGroup{}
for i, aBlockRange := range chunk {
wg.Add(1)
go func(index int, b *blockRange) {
defer wg.Done()
select {
case <-ctx.Done():
indexToLogResult[index] = logRequestResult{
From: b.FromBlock,
To: b.ToBlock,
Err: errors.New("context was canceled"),
Logs: []types.Log{},
}
return
default:
}
logs, err := w.filterLogsRecurisively(b.FromBlock, b.ToBlock, []types.Log{})
if err != nil {
log.WithFields(map[string]interface{}{
"error": err,
"fromBlock": b.FromBlock,
"toBlock": b.ToBlock,
}).Trace("Failed to fetch logs for range")
}
mu.Lock()
indexToLogResult[index] = logRequestResult{
From: b.FromBlock,
To: b.ToBlock,
Err: err,
Logs: logs,
}
mu.Unlock()
}(i, aBlockRange)
}
// Wait for all log requests to complete
wg.Wait()
for i, aBlockRange := range chunk {
logRequestResult := indexToLogResult[i]
// Break at first error encountered
if logRequestResult.Err != nil {
didAPreviousRequestFail = true
furthestBlockProcessed = logRequestResult.From - 1
break
}
allLogs = append(allLogs, logRequestResult.Logs...)
furthestBlockProcessed = aBlockRange.ToBlock
}
<-semaphoreChan
}
return allLogs, furthestBlockProcessed
}
type blockRange struct {
FromBlock int
ToBlock int
}
// getSubBlockRanges breaks up the block range into smaller block ranges of rangeSize.
// `eth_getLogs` requests are inclusive to both the start and end blocks specified and
// so we need to make the ranges exclusive of one another to avoid fetching the same
// blocks' logs twice.
func (w *Watcher) getSubBlockRanges(from, to, rangeSize int) []*blockRange {
chunks := []*blockRange{}
numBlocksLeft := to - from
if numBlocksLeft < rangeSize {
chunks = append(chunks, &blockRange{
FromBlock: from,
ToBlock: to,
})
} else {
blocks := []int{}
for i := 0; i <= numBlocksLeft; i++ {
blocks = append(blocks, from+i)
}
numChunks := len(blocks) / rangeSize
remainder := len(blocks) % rangeSize
if remainder > 0 {
numChunks = numChunks + 1
}
for i := 0; i < numChunks; i = i + 1 {
fromIndex := i * rangeSize
toIndex := fromIndex + rangeSize
if toIndex > len(blocks) {
toIndex = len(blocks)
}
bs := blocks[fromIndex:toIndex]
blockRange := &blockRange{
FromBlock: bs[0],
ToBlock: bs[len(bs)-1],
}
chunks = append(chunks, blockRange)
}
}
return chunks
}
const infuraTooManyResultsErrMsg = "query returned more than 10000 results"
func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []types.Log) ([]types.Log, error) {
log.WithFields(map[string]interface{}{
"from": from,
"to": to,
}).Trace("Fetching block logs")
numBlocks := to - from
topics := [][]common.Hash{}
if len(w.topics) > 0 {
topics = append(topics, w.topics)
}
logs, err := w.client.FilterLogs(ethereum.FilterQuery{
FromBlock: big.NewInt(int64(from)),
ToBlock: big.NewInt(int64(to)),
Topics: topics,
})
if err != nil {
// Infura caps the logs returned to 10,000 per request, if our request exceeds this limit, split it
// into two requests. Parity, Geth and Alchemy all have much higher limits (if any at all), so no need
// to expect any similar errors of this nature from them.
if err.Error() == infuraTooManyResultsErrMsg {
// HACK(fabio): Infura limits the returned results to 10,000 logs, BUT some single
// blocks contain more then 10,000 logs. This has supposedly been fixed but we keep
// this logic here just in case. It helps us avoid infinite recursion.
// Source: https://community.infura.io/t/getlogs-error-query-returned-more-than-1000-results/358/10
if from == to {
return allLogs, fmt.Errorf("Unable to get the logs for block #%d, because it contains too many logs", from)
}
r := numBlocks % 2
firstBatchSize := numBlocks / 2
secondBatchSize := firstBatchSize
if r == 1 {
secondBatchSize = secondBatchSize + 1
}
endFirstHalf := from + firstBatchSize
startSecondHalf := endFirstHalf + 1
allLogs, err := w.filterLogsRecurisively(from, endFirstHalf, allLogs)
if err != nil {
return nil, err
}
allLogs, err = w.filterLogsRecurisively(startSecondHalf, to, allLogs)
if err != nil {
return nil, err
}
return allLogs, nil
} else {
return nil, err
}
}
allLogs = append(allLogs, logs...)
return allLogs, nil
}
// getAllRetainedBlocks returns the blocks retained in-memory by the Watcher.
func (w *Watcher) getAllRetainedBlocks() ([]*miniheader.MiniHeader, error) {
return w.stack.PeekAll()
}