diff --git a/blockchain/chain.go b/blockchain/chain.go index a0b000300c..034a51efe5 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -145,6 +145,7 @@ type BlockChain struct { notifications NotificationCallback sigCache *txscript.SigCache indexManager indexers.IndexManager + indexSubscriber *indexers.IndexSubscriber // subsidyCache is the cache that provides quick lookup of subsidy // values. @@ -301,6 +302,40 @@ type VoteInfo struct { AgendaStatus []ThresholdStateTuple } +// TODO: need to determine which of the two approaches is the most efficient +// way of fetching prevScripts for index updates. + +// prevScriptsSnapshot makes a copy of the entries in the provided viewpoint. +// It implements the PrevScripter interface. +type prevScriptsSnapshot struct { + entries map[chainhash.Hash]*UtxoEntry +} + +// newPrevScriptSnapshot create snapshot from the provided view. +func newPrevScriptSnapshot(view *UtxoViewpoint) *prevScriptsSnapshot { + snapshot := &prevScriptsSnapshot{ + entries: make(map[chainhash.Hash]*UtxoEntry, len(view.entries)), + } + + for k, v := range view.entries { + snapshot.entries[k] = v + } + + return snapshot +} + +// PrevScript returns the prevScript of the provided output. +func (p *prevScriptsSnapshot) PrevScript(prevOut *wire.OutPoint) (uint16, []byte, bool) { + entry := p.entries[prevOut.Hash] + if entry == nil { + return 0, nil, false + } + + version := entry.ScriptVersionByIndex(prevOut.Index) + pkScript := entry.PkScriptByIndex(prevOut.Index) + return version, pkScript, true +} + // GetVoteInfo returns information on consensus deployment agendas // and their respective states at the provided hash, for the provided // deployment version. @@ -512,6 +547,25 @@ func (b *BlockChain) pushMainChainBlockCache(block *dcrutil.Block) { b.mainChainBlockCacheLock.Unlock() } +// PrevScripts returns a source of previous transaction scripts and their +// associated versions spent by the given block by using the spend journal. +func (b *BlockChain) PrevScripts(dbTx database.Tx, block *dcrutil.Block) (indexers.PrevScripter, error) { + // Load all of the spent transaction output data from the database. + stxos, err := dbFetchSpendJournalEntry(dbTx, block) + if err != nil { + return nil, err + } + + prevScripts := stxosToScriptSource(block, stxos, currentCompressionVersion) + return prevScripts, nil +} + +// Best returns the height and hash of the current best chain tip. +func (b *BlockChain) Best() (int64, *chainhash.Hash) { + snapshot := b.BestSnapshot() + return snapshot.Height, &snapshot.Hash +} + // connectBlock handles connecting the passed node/block to the end of the main // (best) chain. // @@ -636,10 +690,22 @@ func (b *BlockChain) connectBlock(node *blockNode, block, parent *dcrutil.Block, return err } + prevScripter := newPrevScriptSnapshot(view) + // Prune fully spent entries and mark all entries in the view unmodified // now that the modifications have been committed to the database. view.commit() + // Notify subscribed clients of the connected block. + if b.indexSubscriber != nil { + b.indexSubscriber.Notify(&indexers.IndexNtfn{ + NtfnType: indexers.ConnectNtfn, + Block: block, + Parent: parent, + PrevScripts: prevScripter, + }) + } + // This node is now the end of the best chain. b.bestChain.SetTip(node) @@ -815,10 +881,22 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block, parent *dcrutil.Blo return err } + prevScripter := newPrevScriptSnapshot(view) + // Prune fully spent entries and mark all entries in the view unmodified // now that the modifications have been committed to the database. view.commit() + // Notify subscribed clients of the disconnected block. + if b.indexSubscriber != nil { + b.indexSubscriber.Notify(&indexers.IndexNtfn{ + NtfnType: indexers.DisconnectNtfn, + Block: block, + Parent: parent, + PrevScripts: prevScripter, + }) + } + // This node's parent is now the end of the best chain. b.bestChain.SetTip(node.parent) @@ -1878,6 +1956,8 @@ func extractDeploymentIDVersions(params *chaincfg.Params) (map[string]uint32, er return deploymentVers, nil } +// TODO: Remove the chainQueryerAdapter when the indexManager gets nuked. + // chainQueryerAdapter provides an adapter from a BlockChain instance to the // indexers.ChainQueryer interface. type chainQueryerAdapter struct { @@ -1973,6 +2053,10 @@ type Config struct { // This field can be nil if the caller does not wish to make use of an // index manager. IndexManager indexers.IndexManager + + // IndexubSubscriber defines a subscriber for relaying updates + // concerning connected and disconnected blocks to subscribed index clients. + IndexSubscriber *indexers.IndexSubscriber } // New returns a BlockChain instance using the provided configuration details. @@ -2026,6 +2110,7 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { notifications: config.Notifications, sigCache: config.SigCache, indexManager: config.IndexManager, + indexSubscriber: config.IndexSubscriber, subsidyCache: subsidyCache, index: newBlockIndex(config.DB), bestChain: newChainView(nil), diff --git a/cmd/addblock/import.go b/cmd/addblock/import.go index fa5c4cb57a..1ec3ee9046 100644 --- a/cmd/addblock/import.go +++ b/cmd/addblock/import.go @@ -318,11 +318,14 @@ func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) { } else { log.Info("Transaction index is enabled") } - indexes = append(indexes, indexers.NewTxIndex(db)) + + // TODO: figure out how the async index can be added for the block + // importer. + // indexes = append(indexes, indexers.NewTxIndex(db)) } if cfg.AddrIndex { - log.Info("Address index is enabled") - indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams)) + // log.Info("Address index is enabled") + // indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams)) } if !cfg.NoExistsAddrIndex { log.Info("Exists address index is enabled") diff --git a/server.go b/server.go index 4b3f578204..429f2b5603 100644 --- a/server.go +++ b/server.go @@ -465,6 +465,7 @@ type server struct { // if the associated index is not enabled. These fields are set during // initial creation of the server and never changed afterwards, so they // do not need to be protected for concurrent access. + indexSubscriber *indexers.IndexSubscriber txIndex *indexers.TxIndex addrIndex *indexers.AddrIndex existsAddrIndex *indexers.ExistsAddrIndex @@ -2886,6 +2887,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP services: services, sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize), subsidyCache: standalone.NewSubsidyCache(chainParams), + indexSubscriber: indexers.NewIndexSubscriber(ctx), } // Create the transaction and address indexes if needed. @@ -2895,25 +2897,6 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP // addrindex is run first, it may not have the transactions from the // current block indexed. var indexes []indexers.Indexer - if cfg.TxIndex || cfg.AddrIndex { - // Enable transaction index if address index is enabled since it - // requires it. - if !cfg.TxIndex { - indxLog.Infof("Transaction index enabled because it " + - "is required by the address index") - cfg.TxIndex = true - } else { - indxLog.Info("Transaction index is enabled") - } - - s.txIndex = indexers.NewTxIndex(db) - indexes = append(indexes, s.txIndex) - } - if cfg.AddrIndex { - indxLog.Info("Address index is enabled") - s.addrIndex = indexers.NewAddrIndex(db, chainParams) - indexes = append(indexes, s.addrIndex) - } if !cfg.NoExistsAddrIndex { indxLog.Info("Exists address index is enabled") s.existsAddrIndex = indexers.NewExistsAddrIndex(db, chainParams) @@ -2969,14 +2952,52 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP s.blockManager.handleBlockchainNotification(notification) } }, - SigCache: s.sigCache, - SubsidyCache: s.subsidyCache, - IndexManager: indexManager, + SigCache: s.sigCache, + SubsidyCache: s.subsidyCache, + IndexManager: indexManager, + IndexSubscriber: s.indexSubscriber, }) if err != nil { return nil, err } + // TODO: Initialize all indexes after creating the chain, this should be + // done after porting over to the index subscriber. + if cfg.TxIndex || cfg.AddrIndex { + // Enable transaction index if address index is enabled since it + // requires it. + if !cfg.TxIndex { + indxLog.Infof("Transaction index enabled because it " + + "is required by the address index") + cfg.TxIndex = true + } else { + indxLog.Info("Transaction index is enabled") + } + + sub, err := s.indexSubscriber.Subscribe(indexers.TxIndexName, "") + if err != nil { + return nil, err + } + + s.txIndex, err = indexers.NewTxIndex(ctx, db, sub, s.chain, s.chainParams) + if err != nil { + return nil, err + } + } + if cfg.AddrIndex { + indxLog.Info("Address index is enabled") + sub, err := s.indexSubscriber.Subscribe(indexers.AddrIndexName, + indexers.TxIndexName) + if err != nil { + return nil, err + } + + s.addrIndex, err = indexers.NewAddrIndex(ctx, db, sub, s.chain, s.chainParams) + if err != nil { + return nil, err + } + } + txC := mempool.Config{ Policy: mempool.Policy{ MaxTxVersion: 2,