-
Notifications
You must be signed in to change notification settings - Fork 16
/
chain_freezer.go
209 lines (189 loc) · 6.13 KB
/
chain_freezer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package rawdb
import (
"fmt"
"github.com/Qitmeer/qng/meerdag"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
)
const (
// freezerRecheckInterval is the frequency to check the key-value database for
// chain progression that might permit new blocks to be frozen into immutable
// storage.
freezerRecheckInterval = time.Minute
// freezerBatchLimit is the maximum number of blocks to freeze in one batch
// before doing an fsync and deleting it from the key-value store.
freezerBatchLimit = 30000
)
// chainFreezer is a wrapper of freezer with additional chain freezing feature.
// The background thread will keep moving ancient chain segments from key-value
// database to flat files for saving space on live database.
type chainFreezer struct {
// WARNING: The `threshold` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
*Freezer
quit chan struct{}
wg sync.WaitGroup
trigger chan chan struct{} // Manual blocking freeze trigger, test determinism
}
// newChainFreezer initializes the freezer for ancient chain data.
func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFreezer, error) {
freezer, err := NewChainFreezer(datadir, namespace, readonly)
if err != nil {
return nil, err
}
return &chainFreezer{
Freezer: freezer,
threshold: params.FullImmutabilityThreshold,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}, nil
}
// Close closes the chain freezer instance and terminates the background thread.
func (f *chainFreezer) Close() error {
select {
case <-f.quit:
default:
close(f.quit)
}
f.wg.Wait()
return f.Freezer.Close()
}
// freeze is a background thread that periodically checks the blockchain for any
// import progress and moves ancient data from the fast database into the freezer.
//
// This functionality is deliberately broken off from block importing to avoid
// incurring additional data shuffling delays on block propagation.
func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
var (
backoff bool
triggered chan struct{} // Used in tests
nfdb = &nofreezedb{KeyValueStore: db}
)
timer := time.NewTimer(freezerRecheckInterval)
defer timer.Stop()
for {
select {
case <-f.quit:
log.Info("Freezer shutting down")
return
default:
}
if backoff {
// If we were doing a manual trigger, notify it
if triggered != nil {
triggered <- struct{}{}
triggered = nil
}
select {
case <-timer.C:
backoff = false
timer.Reset(freezerRecheckInterval)
case triggered = <-f.trigger:
backoff = false
case <-f.quit:
return
}
}
// Retrieve the freezing threshold.
mt := ReadMainChainTip(nfdb)
if mt == nil {
log.Debug("Current full block hash unavailable") // new chain, empty database
backoff = true
continue
}
mb := ReadDAGBlock(nfdb, *mt)
if mb == nil {
log.Debug("Current full block hash unavailable") // new chain, empty database
backoff = true
continue
}
threshold := atomic.LoadUint64(&f.threshold)
frozen := atomic.LoadUint64(&f.frozen)
switch {
case *mt < threshold:
log.Debug("Current full block not old enough", "DAG_ID", *mt, "hash", mb.GetHash(), "delay", threshold)
backoff = true
continue
case *mt-threshold <= frozen:
log.Debug("Ancient blocks frozen already", "DAG_ID", *mt, "hash", mb.GetHash(), "frozen", frozen)
backoff = true
continue
}
// Seems we have data ready to be frozen, process in usable batches
var (
start = time.Now()
first, _ = f.Ancients()
limit = *mt - threshold
)
if limit-first > freezerBatchLimit {
limit = first + freezerBatchLimit
}
ancients, err := f.freezeRange(nfdb, first, limit)
if err != nil {
log.Error("Error in block freeze operation", "err", err)
backoff = true
continue
}
// Batch of blocks have been frozen, flush them before wiping from leveldb
if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
// Wipe out all data from the active database
batch := db.NewBatch()
for i := 0; i < len(ancients); i++ {
// Always keep the genesis block in active database
if first+uint64(i) != 0 {
DeleteBlock(batch, ancients[i].GetHash())
DeleteDAGBlock(batch, uint64(ancients[i].GetID()))
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to delete frozen canonical blocks", "err", err)
}
batch.Reset()
// Log something friendly for the user
context := []interface{}{
"blocks", frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "DAG_ID", frozen - 1,
}
if n := len(ancients); n > 0 {
context = append(context, []interface{}{"hash", ancients[n-1].GetHash()}...)
}
log.Debug("Deep froze chain segment", context...)
// Avoid database thrashing with tiny writes
if frozen-first < freezerBatchLimit {
backoff = true
}
}
}
func (f *chainFreezer) freezeRange(nfdb *nofreezedb, id, limit uint64) ([]meerdag.IBlock, error) {
blocks := make([]meerdag.IBlock, 0, limit-id)
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for ; id <= limit; id++ {
// Retrieve all the components of the canonical block.
mb := ReadDAGBlock(nfdb, id)
if mb == nil {
return fmt.Errorf("canonical hash missing, can't freeze block %d", id)
}
block := ReadBlockBaw(nfdb, mb.GetHash())
if len(block) == 0 {
return fmt.Errorf("block header missing, can't freeze block %d %s", id, mb.GetHash().String())
}
// Write to the batch.
if err := op.AppendRaw(ChainFreezerBlockTable, id, block); err != nil {
return fmt.Errorf("can't write hash to Freezer: %v", err)
}
if err := op.AppendRaw(ChainFreezerDAGBlockTable, id, mb.Bytes()); err != nil {
return fmt.Errorf("can't write header to Freezer: %v", err)
}
blocks = append(blocks, mb)
}
return nil
})
return blocks, err
}