-
Notifications
You must be signed in to change notification settings - Fork 2
/
sequencer.go
107 lines (86 loc) 路 2.36 KB
/
sequencer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// SPDX-FileCopyrightText: 2023 PK Lab AG <contact@pklab.io>
// SPDX-License-Identifier: MIT
package receiver
import (
"bytes"
"context"
"encoding/hex"
"github.com/dipdup-io/celestia-indexer/pkg/types"
)
func (r *Module) sequencer(ctx context.Context) {
orderedBlocks := map[int64]types.BlockData{}
l, prevBlockHash := r.Level()
currentBlock := int64(l + 1)
for {
select {
case <-ctx.Done():
return
case block, ok := <-r.blocks:
if !ok {
r.Log.Warn().Msg("can't read message from blocks input, channel was dried and closed")
r.stopAll()
return
}
orderedBlocks[block.Block.Height] = block
b, ok := orderedBlocks[currentBlock]
for ok {
if prevBlockHash != nil {
if !bytes.Equal(b.Block.LastBlockID.Hash, prevBlockHash) {
prevBlockHash, currentBlock, orderedBlocks = r.startRollback(ctx, b, prevBlockHash)
break
}
}
r.MustOutput(BlocksOutput).Push(b)
r.setLevel(types.Level(currentBlock), b.BlockID.Hash)
r.Log.Debug().
Uint64("height", uint64(currentBlock)).
Msg("put in order block")
prevBlockHash = b.BlockID.Hash
delete(orderedBlocks, currentBlock)
currentBlock += 1
b, ok = orderedBlocks[currentBlock]
}
}
}
}
func (r *Module) startRollback(
ctx context.Context,
b types.BlockData,
prevBlockHash []byte,
) ([]byte, int64, map[int64]types.BlockData) {
r.Log.Info().
Str("current.lastBlockHash", hex.EncodeToString(b.Block.LastBlockID.Hash)).
Str("prevBlockHash", hex.EncodeToString(prevBlockHash)).
Uint64("level", uint64(b.Height)).
Msg("rollback detected")
// Pause all receiver routines
r.rollbackSync.Add(1)
// Stop readBlocks
if r.cancelReadBlocks != nil {
r.cancelReadBlocks()
}
// Stop pool workers
if r.cancelWorkers != nil {
r.cancelWorkers()
}
clearChannel(r.blocks)
// Start rollback
r.MustOutput(RollbackOutput).Push(struct{}{})
// Wait until rollback will be finished
r.rollbackSync.Wait()
// Reset empty state
level, hash := r.Level()
currentBlock := int64(level)
prevBlockHash = hash
orderedBlocks := map[int64]types.BlockData{}
// Restart workers pool that read blocks
workersCtx, cancelWorkers := context.WithCancel(ctx)
r.cancelWorkers = cancelWorkers
r.pool.Start(workersCtx)
return prevBlockHash, currentBlock, orderedBlocks
}
func clearChannel(blocks <-chan types.BlockData) {
for len(blocks) > 0 {
<-blocks
}
}