Skip to content

Commit

Permalink
cmd/addblock: update block importer.
Browse files Browse the repository at this point in the history
This updates the block importer to use an index subscriber
in order to leverage async indexes.
  • Loading branch information
dnldd committed Dec 19, 2020
1 parent 526e689 commit a1184e5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 43 deletions.
6 changes: 4 additions & 2 deletions cmd/addblock/addblock.go
Expand Up @@ -6,6 +6,7 @@
package main

import (
"context"
"errors"
"os"
"path/filepath"
Expand Down Expand Up @@ -94,7 +95,8 @@ func realMain() error {
// Create a block importer for the database and input file and start it.
// The done channel returned from start will contain an error if
// anything went wrong.
importer, err := newBlockImporter(db, fi)
ctx, cancel := context.WithCancel(context.TODO())
importer, err := newBlockImporter(ctx, db, fi, cancel)
if err != nil {
log.Errorf("Failed create block importer: %v", err)
return err
Expand All @@ -105,7 +107,7 @@ func realMain() error {
// Import contains the statistics about the import including an error
// if something went wrong.
log.Info("Starting import")
resultsChan := importer.Import()
resultsChan := importer.Import(ctx)
results := <-resultsChan
if results.err != nil {
log.Errorf("%v", results.err)
Expand Down
99 changes: 58 additions & 41 deletions cmd/addblock/import.go
Expand Up @@ -51,6 +51,11 @@ type blockImporter struct {
lastBlockTime time.Time
lastLogTime time.Time
startTime time.Time

txIndex *indexers.TxIndex
addrIndex *indexers.AddrIndex
existsAddrIndex *indexers.ExistsAddrIndex
cancel context.CancelFunc
}

// readBlock reads the next block from the input file.
Expand Down Expand Up @@ -217,7 +222,7 @@ func (bi *blockImporter) logProgress() {
// processHandler is the main handler for processing blocks. This allows block
// processing to take place in parallel with block reads from the import file.
// It must be run as a goroutine.
func (bi *blockImporter) processHandler() {
func (bi *blockImporter) processHandler(ctx context.Context) {
out:
for {
select {
Expand All @@ -243,6 +248,9 @@ out:

case <-bi.quit:
break out

case <-ctx.Done():
close(bi.quit)
}
}
bi.wg.Done()
Expand Down Expand Up @@ -278,12 +286,12 @@ func (bi *blockImporter) statusHandler(resultsChan chan *importResults) {
// Import is the core function which handles importing the blocks from the file
// associated with the block importer to the database. It returns a channel
// on which the results will be returned when the operation has completed.
func (bi *blockImporter) Import() chan *importResults {
func (bi *blockImporter) Import(ctx context.Context) chan *importResults {
// Start up the read and process handling goroutines. This setup allows
// blocks to be read from disk in parallel while being processed.
bi.wg.Add(2)
go bi.readHandler()
go bi.processHandler()
go bi.processHandler(ctx)

// Wait for the import to finish in a separate goroutine and signal
// the status handler when done.
Expand All @@ -301,14 +309,25 @@ func (bi *blockImporter) Import() chan *importResults {

// newBlockImporter returns a new importer for the provided file reader seeker
// and database.
func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) {
func newBlockImporter(ctx context.Context, db database.DB, r io.ReadSeeker, cancel context.CancelFunc) (*blockImporter, error) {

subber := indexers.NewIndexSubscriber(ctx)
chain, err := blockchain.New(context.Background(),
&blockchain.Config{
DB: db,
ChainParams: activeNetParams,
Checkpoints: activeNetParams.Checkpoints,
TimeSource: blockchain.NewMedianTime(),
IndexSubscriber: subber,
})
if err != nil {
return nil, err
}

// Create the various indexes as needed.
//
// CAUTION: the txindex needs to be first in the indexes array because
// the addrindex uses data from the txindex during catchup. If the
// addrindex is run first, it may not have the transactions from the
// current block indexed.
var indexes []indexers.Indexer
var txIndex *indexers.TxIndex
var addrIndex *indexers.AddrIndex
var existsAddrIndex *indexers.ExistsAddrIndex
if cfg.TxIndex || cfg.AddrIndex {
// Enable transaction index if address index is enabled since it
// requires it.
Expand All @@ -319,45 +338,43 @@ func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) {
} else {
log.Info("Transaction index is enabled")
}
indexes = append(indexes, indexers.NewTxIndex(db))

txIndex, err = indexers.NewTxIndex(ctx, cancel, db, chain, activeNetParams,
subber)
if err != nil {
return nil, err
}
}
if cfg.AddrIndex {
log.Info("Address index is enabled")
indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams))
addrIndex, err = indexers.NewAddrIndex(ctx, cancel, db, chain, activeNetParams,
subber)
if err != nil {
return nil, err
}
}
if !cfg.NoExistsAddrIndex {
log.Info("Exists address index is enabled")
indexes = append(indexes, indexers.NewExistsAddrIndex(db,
activeNetParams))
}

// Create an index manager if any of the optional indexes are enabled.
var indexManager indexers.IndexManager
if len(indexes) > 0 {
indexManager = indexers.NewManager(db, indexes, activeNetParams)
}

chain, err := blockchain.New(context.Background(),
&blockchain.Config{
DB: db,
ChainParams: activeNetParams,
Checkpoints: activeNetParams.Checkpoints,
TimeSource: blockchain.NewMedianTime(),
IndexManager: indexManager,
})
if err != nil {
return nil, err
existsAddrIndex, err = indexers.NewExistsAddrIndex(ctx, cancel, db, chain,
activeNetParams, subber)
if err != nil {
return nil, err
}
}

return &blockImporter{
db: db,
r: r,
processQueue: make(chan []byte, 2),
doneChan: make(chan bool),
errChan: make(chan error),
quit: make(chan struct{}),
chain: chain,
lastLogTime: time.Now(),
startTime: time.Now(),
db: db,
r: r,
processQueue: make(chan []byte, 2),
doneChan: make(chan bool),
errChan: make(chan error),
quit: make(chan struct{}),
chain: chain,
lastLogTime: time.Now(),
startTime: time.Now(),
txIndex: txIndex,
addrIndex: addrIndex,
existsAddrIndex: existsAddrIndex,
cancel: cancel,
}, nil
}

0 comments on commit a1184e5

Please sign in to comment.