/
sync.go
466 lines (424 loc) · 16.4 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
// Copyright (c) 2018-2019, The Decred developers
// Copyright (c) 2017, The dcrdata developers
// See LICENSE for details.
package dcrpg
import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
"time"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrdata/db/dbtypes"
"github.com/decred/dcrdata/rpcutils"
)
const (
rescanLogBlockChunk = 500
initialLoadSyncStatusMsg = "(Full Mode) Syncing stake, base and auxiliary DBs..."
addressesSyncStatusMsg = "Syncing addresses table with spending info..."
)
/////////// Coordinated synchronization of base DB and auxiliary DB. ///////////
//
// In full mode, a challenge is to keep base and aux DBs syncing at the same
// height. One of the main reasons is that there is one StakeDatabase, which is
// shared between the two, and both DBs need access to it for each height.
//
// rpcutils.BlockGetter is an interface with basic accessor methods like Block
// and WaitForHash that can request current data and channels for future data,
// but do not update the state of the object. The rpcutils.MasterBlockGetter is
// an interface that embeds the regular BlockGetter, adding with functions that
// can change state and signal to the BlockGetters, such as UpdateToHeight,
// which will get the block via RPC and signal to all channels configured for
// that block.
//
// In full mode, ChainDB has the MasterBlockGetter and WiredDB has the
// BlockGetter. The way ChainDB is in charge of requesting blocks on demand from
// RPC without getting ahead of WiredDB during sync is that StakeDatabase has a
// very similar coordination mechanism (WaitForHeight).
//
// 1. In main, we make a new `rpcutils.BlockGate`, a concrete type that
// implements `MasterBlockGetter` and thus `BlockGetter` too. This
// "smart client" is provided to `baseDB` (a `WiredDB`) as a `BlockGetter`,
// and to `auxDB` (a `ChainDB`) as a `MasterBlockGetter`.
//
// 2. `baseDB` makes a channel using `BlockGetter.WaitForHeight` and starts
// waiting for the current block to come across the channel.
//
// 3. `auxDB` makes a channel using `StakeDatabase.WaitForHeight`, which
// instructs the shared stake DB to send notification when it connects the
// specified block. `auxDB` does not start waiting for the signal yet.
//
// 4. `auxDB` requests that the same current block be retrieved via RPC using
// `MasterBlockGetter.UpdateToBlock`.
//
// 5. `auxDB` immediately begins waiting for the signal that `StakeDatabase` has
// connected the block.
//
// 6. The call to `UpdateToBlock` causes the underlying (shared) smartClient to
// send a signal on all channels registered for that block.
//
// 7. `baseDB`gets notification on the channel and retrieves the block (which
// the channel signaled is now available) via `BlockGetter.Block`.
//
// 8. Before connecting the block in the `StakeDatabase`, `baseDB` gets a new
// channel for the following (i+1) block, so that when `auxDB` requests it
// later the channel will be registered already.
//
// 9. `baseDB` connects the block in `StakeDatabase`.
//
// 10. `StakeDatabase` signals to all waiters (`auxDB`, see 5) that the stake db
// is ready at the needed height.
//
// 11. `baseDB` finishes block data processing/storage and goes back to 2. for
// the next block.
//
// 12. Concurrent with `baseDB` processing in 11., `auxDB` receives the
// notification from `StakeDatabase` sent in 10. and continues block data
// processing/storage. When done processing, `auxDB` goes back to step 3.
// for the next block. As with the previous iteration, it sets the pace with
// `UpdateToBlock`.
//
// With the above approach, (a) the DBs share a single StakeDatabase, (b) the
// DBs are in sync (tightly coupled), (c) there is ample opportunity for
// concurrent computations, and (d) the shared blockGetter (as a
// MasterBlockGetter in auxDB, and a BlockGetter in baseDB) makes it so a given
// block will only be fetched via RPC ONCE and stored for the BlockGetters that
// are waiting for the block.
////////////////////////////////////////////////////////////////////////////////
// SyncChainDBAsync is like SyncChainDB except it also takes a result channel on
// which the caller should wait to receive the result. As such, this method
// should be called as a goroutine or it will hang on send if the channel is
// unbuffered.
func (db *ChainDB) SyncChainDBAsync(ctx context.Context, res chan dbtypes.SyncResult,
client rpcutils.MasterBlockGetter, updateAllAddresses, updateAllVotes, newIndexes bool,
updateExplorer chan *chainhash.Hash, barLoad chan *dbtypes.ProgressBarLoad) {
if db == nil {
res <- dbtypes.SyncResult{
Height: -1,
Error: fmt.Errorf("ChainDB (psql) disabled"),
}
return
}
height, err := db.SyncChainDB(ctx, client, updateAllAddresses,
updateAllVotes, newIndexes, updateExplorer, barLoad)
if err != nil {
log.Debugf("SyncChainDB quit at height %d, err: %v", height, err)
} else {
log.Debugf("SyncChainDB completed at height %d.", height)
}
res <- dbtypes.SyncResult{
Height: height,
Error: err,
}
}
// SyncChainDB stores in the DB all blocks on the main chain available from the
// RPC client. The table indexes may be force-dropped and recreated by setting
// newIndexes to true. The quit channel is used to break the sync loop. For
// example, closing the channel on SIGINT.
func (db *ChainDB) SyncChainDB(ctx context.Context, client rpcutils.MasterBlockGetter,
updateAllAddresses, updateAllVotes, newIndexes bool,
updateExplorer chan *chainhash.Hash, barLoad chan *dbtypes.ProgressBarLoad) (int64, error) {
// Note that we are doing a batch blockchain sync
db.InBatchSync = true
defer func() { db.InBatchSync = false }()
// Get chain servers's best block
nodeHeight, err := client.NodeHeight()
if err != nil {
return -1, fmt.Errorf("GetBestBlock failed: %v", err)
}
// Total and rate statistics
var totalTxs, totalVins, totalVouts, totalAddresses int64
var lastTxs, lastVins, lastVouts int64
tickTime := 20 * time.Second
ticker := time.NewTicker(tickTime)
startTime := time.Now()
o := sync.Once{}
speedReporter := func() {
ticker.Stop()
totalElapsed := time.Since(startTime).Seconds()
if int64(totalElapsed) == 0 {
return
}
totalVoutPerSec := totalVouts / int64(totalElapsed)
totalTxPerSec := totalTxs / int64(totalElapsed)
if totalTxs == 0 {
return
}
log.Infof("Avg. speed: %d tx/s, %d vout/s", totalTxPerSec, totalVoutPerSec)
}
speedReport := func() { o.Do(speedReporter) }
defer speedReport()
lastBlock, err := db.HeightDB()
if err != nil {
if err == sql.ErrNoRows {
log.Info("blocks table is empty, starting fresh.")
} else {
return -1, fmt.Errorf("RetrieveBestBlockHeight: %v", err)
}
}
// Remove indexes/constraints before an initial sync or when explicitly
// requested to reindex and update spending information in the addresses
// table.
reindexing := newIndexes || lastBlock == -1
if reindexing {
// Remove any existing indexes.
log.Info("Large bulk load: Removing indexes and disabling duplicate checks.")
err = db.DeindexAll()
if err != nil && !strings.Contains(err.Error(), "does not exist") {
return lastBlock, err
}
// Disable duplicate checks on insert queries since the unique indexes
// that enforce the constraints will not exist.
db.EnableDuplicateCheckOnInsert(false)
// Syncing blocks without indexes requires a UTXO cache to avoid
// extremely expensive queries. Warm the UTXO cache if resuming an
// interrupted initial sync.
blocksToSync := nodeHeight - lastBlock
if lastBlock > 0 && blocksToSync > 50 {
log.Infof("Collecting all UTXO data prior to height %d...", lastBlock+1)
utxos, err := RetrieveUTXOs(ctx, db.db)
if err != nil {
return -1, fmt.Errorf("RetrieveUTXOs: %v", err)
}
log.Infof("Pre-warming UTXO cache with %d UTXOs...", len(utxos))
db.InitUtxoCache(utxos)
log.Infof("UTXO cache is ready.")
}
} else {
// When the unique indexes exist, inserts should check for conflicts
// with the tables' constraints.
db.EnableDuplicateCheckOnInsert(true)
}
// Safely send sync status updates on barLoad channel, and set the channel
// to nil if the buffer is full.
sendProgressUpdate := func(p *dbtypes.ProgressBarLoad) {
if barLoad == nil {
return
}
select {
case barLoad <- p:
default:
log.Debugf("(*ChainDB).SyncChainDB: barLoad chan closed or full. Halting sync progress updates.")
barLoad = nil
}
}
// Safely send new block hash on updateExplorer channel, and set the channel
// to nil if the buffer is full.
sendPageData := func(hash *chainhash.Hash) {
if updateExplorer == nil {
return
}
select {
case updateExplorer <- hash:
default:
log.Debugf("(*ChainDB).SyncChainDB: updateExplorer chan closed or full. Halting explorer updates.")
updateExplorer = nil
}
}
// Add the various updates that should run on successful sync.
sendProgressUpdate(&dbtypes.ProgressBarLoad{
Msg: initialLoadSyncStatusMsg,
BarID: dbtypes.InitialDBLoad,
})
// Addresses table sync should only run if bulk update is enabled.
if updateAllAddresses {
sendProgressUpdate(&dbtypes.ProgressBarLoad{
Msg: addressesSyncStatusMsg,
BarID: dbtypes.AddressesTableSync,
})
}
timeStart := time.Now()
// Start rebuilding
startHeight := lastBlock + 1
for ib := startHeight; ib <= nodeHeight; ib++ {
// check for quit signal
select {
case <-ctx.Done():
log.Infof("Rescan cancelled at height %d.", ib)
return ib - 1, nil
default:
}
if (ib-1)%rescanLogBlockChunk == 0 || ib == startHeight {
if ib == 0 {
log.Infof("Scanning genesis block into auxiliary chain db.")
} else {
endRangeBlock := rescanLogBlockChunk * (1 + (ib-1)/rescanLogBlockChunk)
if endRangeBlock > nodeHeight {
endRangeBlock = nodeHeight
}
log.Infof("Processing blocks %d to %d...", ib, endRangeBlock)
if barLoad != nil {
// Full mode is definitely running so no need to check.
timeTakenPerBlock := (time.Since(timeStart).Seconds() / float64(endRangeBlock-ib))
sendProgressUpdate(&dbtypes.ProgressBarLoad{
From: ib,
To: nodeHeight,
Timestamp: int64(timeTakenPerBlock * float64(nodeHeight-endRangeBlock)),
Msg: initialLoadSyncStatusMsg,
BarID: dbtypes.InitialDBLoad,
})
timeStart = time.Now()
}
}
}
select {
case <-ticker.C:
blocksPerSec := float64(ib-lastBlock) / tickTime.Seconds()
txPerSec := float64(totalTxs-lastTxs) / tickTime.Seconds()
vinsPerSec := float64(totalVins-lastVins) / tickTime.Seconds()
voutPerSec := float64(totalVouts-lastVouts) / tickTime.Seconds()
log.Infof("(%3d blk/s,%5d tx/s,%5d vin/sec,%5d vout/s)", int64(blocksPerSec),
int64(txPerSec), int64(vinsPerSec), int64(voutPerSec))
lastBlock, lastTxs = ib, totalTxs
lastVins, lastVouts = totalVins, totalVouts
default:
}
// Register for notification from stakedb when it connects this block.
waitChan := db.stakeDB.WaitForHeight(ib)
// Get the block, making it available to stakedb, which will signal on
// the above channel when it is done connecting it.
block, err := client.UpdateToBlock(ib)
if err != nil {
log.Errorf("UpdateToBlock (%d) failed: %v", ib, err)
return ib - 1, fmt.Errorf("UpdateToBlock (%d) failed: %v", ib, err)
}
// Wait for our StakeDatabase to connect the block
var blockHash *chainhash.Hash
select {
case blockHash = <-waitChan:
case <-ctx.Done():
log.Infof("Rescan cancelled at height %d.", ib)
return ib - 1, nil
}
if blockHash == nil {
log.Errorf("stakedb says that block %d has come and gone", ib)
return ib - 1, fmt.Errorf("stakedb says that block %d has come and gone", ib)
}
// If not master:
//blockHash := <-client.WaitForHeight(ib)
//block, err := client.Block(blockHash)
// direct:
//block, blockHash, err := rpcutils.GetBlock(ib, client)
// Winning tickets from StakeDatabase, which just connected the block,
// as signaled via the waitChan.
tpi, ok := db.stakeDB.PoolInfo(*blockHash)
if !ok {
return ib - 1, fmt.Errorf("stakeDB.PoolInfo could not locate block %s", blockHash.String())
}
winners := tpi.Winners
// Get the chainwork
chainWork, err := client.GetChainWork(blockHash)
if err != nil {
return ib - 1, fmt.Errorf("GetChainWork failed (%s): %v", blockHash, err)
}
// Store data from this block in the database
isValid, isMainchain := true, true
// updateExisting is ignored if dupCheck=false, but true since this is
// processing main chain blocks.
updateExisting := true
numVins, numVouts, numAddresses, err := db.StoreBlock(block.MsgBlock(), winners, isValid,
isMainchain, updateExisting, !updateAllAddresses, !updateAllVotes, chainWork)
if err != nil {
return ib - 1, fmt.Errorf("StoreBlock failed: %v", err)
}
totalVins += numVins
totalVouts += numVouts
totalAddresses += numAddresses
// Total transactions is the sum of regular and stake transactions
totalTxs += int64(len(block.STransactions()) + len(block.Transactions()))
// Update explorer pages at intervals of 20 blocks if the update channel
// is active (non-nil and not closed).
if ib%20 == 0 && !updateAllAddresses {
if updateExplorer != nil {
log.Infof("Updating the explorer with information for block %v", ib)
sendPageData(blockHash)
}
}
// Update height, the end condition for the loop
if nodeHeight, err = client.NodeHeight(); err != nil {
return ib, fmt.Errorf("GetBestBlock failed: %v", err)
}
}
// After the last call to StoreBlock, synchronously update the project fund
// and clear the general address balance cache.
if err = db.FreshenAddressCaches(false, nil); err != nil {
log.Warnf("FreshenAddressCaches: %v", err)
err = nil // not an error with sync
}
// Signal the end of the initial load sync.
sendProgressUpdate(&dbtypes.ProgressBarLoad{
From: nodeHeight,
To: nodeHeight,
Msg: initialLoadSyncStatusMsg,
BarID: dbtypes.InitialDBLoad,
})
speedReport()
if reindexing {
// Duplicate transactions, vins, and vouts can end up in the tables when
// identical transactions are included in multiple blocks. This happens
// when a block is invalidated and the transactions are subsequently
// re-mined in another block. Remove these before indexing.
if err = db.DeleteDuplicates(barLoad); err != nil {
return 0, err
}
// Create indexes
if err = db.IndexAll(barLoad); err != nil {
return nodeHeight, fmt.Errorf("IndexAll failed: %v", err)
}
// Only reindex addresses and tickets tables here if not doing it below
if !updateAllAddresses {
err = db.IndexAddressTable(barLoad)
}
if !updateAllVotes {
err = db.IndexTicketsTable(barLoad)
}
}
// Batch update addresses table with spending info
if updateAllAddresses {
// Remove existing indexes not on funding txns
_ = db.DeindexAddressTable() // ignore errors for non-existent indexes
log.Infof("Populating spending tx info in address table...")
numAddresses, err := db.UpdateSpendingInfoInAllAddresses(barLoad)
if err != nil {
log.Errorf("UpdateSpendingInfoInAllAddresses FAILED: %v", err)
}
// Index addresses table
log.Infof("Updated %d rows of address table", numAddresses)
if err = db.IndexAddressTable(barLoad); err != nil {
log.Errorf("IndexAddressTable FAILED: %v", err)
}
}
// Batch update tickets table with spending info
if updateAllVotes {
// Remove indexes not on funding txns (remove on tickets table indexes)
_ = db.DeindexTicketsTable() // ignore errors for non-existent indexes
db.EnableDuplicateCheckOnInsert(false)
log.Infof("Populating spending tx info in tickets table...")
numTicketsUpdated, err := db.UpdateSpendingInfoInAllTickets()
if err != nil {
log.Errorf("UpdateSpendingInfoInAllTickets FAILED: %v", err)
}
// Index tickets table
log.Infof("Updated %d rows of address table", numTicketsUpdated)
if err = db.IndexTicketsTable(barLoad); err != nil {
log.Errorf("IndexTicketsTable FAILED: %v", err)
}
}
// After sync and indexing, must use upsert statement, which checks for
// duplicate entries and updates instead of throwing and error and panicing.
db.EnableDuplicateCheckOnInsert(true)
if barLoad != nil {
barID := dbtypes.InitialDBLoad
if updateAllAddresses {
barID = dbtypes.AddressesTableSync
}
sendProgressUpdate(&dbtypes.ProgressBarLoad{
BarID: barID,
Subtitle: "sync complete",
})
}
log.Infof("Sync finished at height %d. Delta: %d blocks, %d transactions, %d ins, %d outs, %d addresses",
nodeHeight, nodeHeight-startHeight+1, totalTxs, totalVins, totalVouts, totalAddresses)
return nodeHeight, err
}