From c191657936eb22a83b5455bcad1897355a78ed36 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Sat, 19 Dec 2020 01:23:48 +0000 Subject: [PATCH] cmd/addblock: update block importer. This updates the block importer to use an index subscriber in order to leverage async indexes. --- cmd/addblock/addblock.go | 6 ++- cmd/addblock/import.go | 99 +++++++++++++++++++++++----------------- 2 files changed, 62 insertions(+), 43 deletions(-) diff --git a/cmd/addblock/addblock.go b/cmd/addblock/addblock.go index 8a43b45475..f1e0f5509c 100644 --- a/cmd/addblock/addblock.go +++ b/cmd/addblock/addblock.go @@ -6,6 +6,7 @@ package main import ( + "context" "errors" "os" "path/filepath" @@ -94,7 +95,8 @@ func realMain() error { // Create a block importer for the database and input file and start it. // The done channel returned from start will contain an error if // anything went wrong. - importer, err := newBlockImporter(db, fi) + ctx, cancel := context.WithCancel(context.TODO()) + importer, err := newBlockImporter(ctx, db, fi, cancel) if err != nil { log.Errorf("Failed create block importer: %v", err) return err @@ -105,7 +107,7 @@ func realMain() error { // Import contains the statistics about the import including an error // if something went wrong. log.Info("Starting import") - resultsChan := importer.Import() + resultsChan := importer.Import(ctx) results := <-resultsChan if results.err != nil { log.Errorf("%v", results.err) diff --git a/cmd/addblock/import.go b/cmd/addblock/import.go index 379f4d0528..f5d0b13659 100644 --- a/cmd/addblock/import.go +++ b/cmd/addblock/import.go @@ -51,6 +51,11 @@ type blockImporter struct { lastBlockTime time.Time lastLogTime time.Time startTime time.Time + + txIndex *indexers.TxIndex + addrIndex *indexers.AddrIndex + existsAddrIndex *indexers.ExistsAddrIndex + cancel context.CancelFunc } // readBlock reads the next block from the input file. @@ -215,7 +220,7 @@ func (bi *blockImporter) logProgress() { // processHandler is the main handler for processing blocks. This allows block // processing to take place in parallel with block reads from the import file. // It must be run as a goroutine. -func (bi *blockImporter) processHandler() { +func (bi *blockImporter) processHandler(ctx context.Context) { out: for { select { @@ -241,6 +246,9 @@ out: case <-bi.quit: break out + + case <-ctx.Done(): + close(bi.quit) } } bi.wg.Done() @@ -276,12 +284,12 @@ func (bi *blockImporter) statusHandler(resultsChan chan *importResults) { // Import is the core function which handles importing the blocks from the file // associated with the block importer to the database. It returns a channel // on which the results will be returned when the operation has completed. -func (bi *blockImporter) Import() chan *importResults { +func (bi *blockImporter) Import(ctx context.Context) chan *importResults { // Start up the read and process handling goroutines. This setup allows // blocks to be read from disk in parallel while being processed. bi.wg.Add(2) go bi.readHandler() - go bi.processHandler() + go bi.processHandler(ctx) // Wait for the import to finish in a separate goroutine and signal // the status handler when done. @@ -299,14 +307,25 @@ func (bi *blockImporter) Import() chan *importResults { // newBlockImporter returns a new importer for the provided file reader seeker // and database. -func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) { +func newBlockImporter(ctx context.Context, db database.DB, r io.ReadSeeker, cancel context.CancelFunc) (*blockImporter, error) { + + subber := indexers.NewIndexSubscriber(ctx) + chain, err := blockchain.New(context.Background(), + &blockchain.Config{ + DB: db, + ChainParams: activeNetParams, + Checkpoints: activeNetParams.Checkpoints, + TimeSource: blockchain.NewMedianTime(), + IndexSubscriber: subber, + }) + if err != nil { + return nil, err + } + // Create the various indexes as needed. - // - // CAUTION: the txindex needs to be first in the indexes array because - // the addrindex uses data from the txindex during catchup. If the - // addrindex is run first, it may not have the transactions from the - // current block indexed. - var indexes []indexers.Indexer + var txIndex *indexers.TxIndex + var addrIndex *indexers.AddrIndex + var existsAddrIndex *indexers.ExistsAddrIndex if cfg.TxIndex || cfg.AddrIndex { // Enable transaction index if address index is enabled since it // requires it. @@ -317,45 +336,43 @@ func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) { } else { log.Info("Transaction index is enabled") } - indexes = append(indexes, indexers.NewTxIndex(db)) + + txIndex, err = indexers.NewTxIndex(ctx, cancel, db, chain, activeNetParams, + subber) + if err != nil { + return nil, err + } } if cfg.AddrIndex { log.Info("Address index is enabled") - indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams)) + addrIndex, err = indexers.NewAddrIndex(ctx, cancel, db, chain, activeNetParams, + subber) + if err != nil { + return nil, err + } } if !cfg.NoExistsAddrIndex { log.Info("Exists address index is enabled") - indexes = append(indexes, indexers.NewExistsAddrIndex(db, - activeNetParams)) - } - - // Create an index manager if any of the optional indexes are enabled. - var indexManager indexers.IndexManager - if len(indexes) > 0 { - indexManager = indexers.NewManager(db, indexes, activeNetParams) - } - - chain, err := blockchain.New(context.Background(), - &blockchain.Config{ - DB: db, - ChainParams: activeNetParams, - Checkpoints: activeNetParams.Checkpoints, - TimeSource: blockchain.NewMedianTime(), - IndexManager: indexManager, - }) - if err != nil { - return nil, err + existsAddrIndex, err = indexers.NewExistsAddrIndex(ctx, cancel, db, chain, + activeNetParams, subber) + if err != nil { + return nil, err + } } return &blockImporter{ - db: db, - r: r, - processQueue: make(chan []byte, 2), - doneChan: make(chan bool), - errChan: make(chan error), - quit: make(chan struct{}), - chain: chain, - lastLogTime: time.Now(), - startTime: time.Now(), + db: db, + r: r, + processQueue: make(chan []byte, 2), + doneChan: make(chan bool), + errChan: make(chan error), + quit: make(chan struct{}), + chain: chain, + lastLogTime: time.Now(), + startTime: time.Now(), + txIndex: txIndex, + addrIndex: addrIndex, + existsAddrIndex: existsAddrIndex, + cancel: cancel, }, nil }