Skip to content

Commit

Permalink
multi: integrate index subscriber.
Browse files Browse the repository at this point in the history
This integrates the index subscriber into
blockchain and updates the txindex and addrindex to subscribe to it and receive notifications.
  • Loading branch information
dnldd committed Jun 26, 2020
1 parent 5583ff5 commit 107811f
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 25 deletions.
85 changes: 85 additions & 0 deletions blockchain/chain.go
Expand Up @@ -145,6 +145,7 @@ type BlockChain struct {
notifications NotificationCallback
sigCache *txscript.SigCache
indexManager indexers.IndexManager
indexSubscriber *indexers.IndexSubscriber

// subsidyCache is the cache that provides quick lookup of subsidy
// values.
Expand Down Expand Up @@ -301,6 +302,40 @@ type VoteInfo struct {
AgendaStatus []ThresholdStateTuple
}

// TODO: need to determine which of the two approaches is the most efficient
// way of fetching prevScripts for index updates.

// prevScriptsSnapshot makes a copy of the entries in the provided viewpoint.
// It implements the PrevScripter interface.
type prevScriptsSnapshot struct {
entries map[chainhash.Hash]*UtxoEntry
}

// newPrevScriptSnapshot create snapshot from the provided view.
func newPrevScriptSnapshot(view *UtxoViewpoint) *prevScriptsSnapshot {
snapshot := &prevScriptsSnapshot{
entries: make(map[chainhash.Hash]*UtxoEntry, len(view.entries)),
}

for k, v := range view.entries {
snapshot.entries[k] = v
}

return snapshot
}

// PrevScript returns the prevScript of the provided output.
func (p *prevScriptsSnapshot) PrevScript(prevOut *wire.OutPoint) (uint16, []byte, bool) {
entry := p.entries[prevOut.Hash]
if entry == nil {
return 0, nil, false
}

version := entry.ScriptVersionByIndex(prevOut.Index)
pkScript := entry.PkScriptByIndex(prevOut.Index)
return version, pkScript, true
}

// GetVoteInfo returns information on consensus deployment agendas
// and their respective states at the provided hash, for the provided
// deployment version.
Expand Down Expand Up @@ -512,6 +547,25 @@ func (b *BlockChain) pushMainChainBlockCache(block *dcrutil.Block) {
b.mainChainBlockCacheLock.Unlock()
}

// PrevScripts returns a source of previous transaction scripts and their
// associated versions spent by the given block by using the spend journal.
func (b *BlockChain) PrevScripts(dbTx database.Tx, block *dcrutil.Block) (indexers.PrevScripter, error) {
// Load all of the spent transaction output data from the database.
stxos, err := dbFetchSpendJournalEntry(dbTx, block)
if err != nil {
return nil, err
}

prevScripts := stxosToScriptSource(block, stxos, currentCompressionVersion)
return prevScripts, nil
}

// Best returns the height and hash of the current best chain tip.
func (b *BlockChain) Best() (int64, *chainhash.Hash) {
snapshot := b.BestSnapshot()
return snapshot.Height, &snapshot.Hash
}

// connectBlock handles connecting the passed node/block to the end of the main
// (best) chain.
//
Expand Down Expand Up @@ -636,10 +690,22 @@ func (b *BlockChain) connectBlock(node *blockNode, block, parent *dcrutil.Block,
return err
}

prevScripter := newPrevScriptSnapshot(view)

// Prune fully spent entries and mark all entries in the view unmodified
// now that the modifications have been committed to the database.
view.commit()

// Notify subscribed clients of the connected block.
if b.indexSubscriber != nil {
b.indexSubscriber.Notify(&indexers.IndexNtfn{
NtfnType: indexers.ConnectNtfn,
Block: block,
Parent: parent,
PrevScripts: prevScripter,
})
}

// This node is now the end of the best chain.
b.bestChain.SetTip(node)

Expand Down Expand Up @@ -815,10 +881,22 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block, parent *dcrutil.Blo
return err
}

prevScripter := newPrevScriptSnapshot(view)

// Prune fully spent entries and mark all entries in the view unmodified
// now that the modifications have been committed to the database.
view.commit()

// Notify subscribed clients of the disconnected block.
if b.indexSubscriber != nil {
b.indexSubscriber.Notify(&indexers.IndexNtfn{
NtfnType: indexers.DisconnectNtfn,
Block: block,
Parent: parent,
PrevScripts: prevScripter,
})
}

// This node's parent is now the end of the best chain.
b.bestChain.SetTip(node.parent)

Expand Down Expand Up @@ -1878,6 +1956,8 @@ func extractDeploymentIDVersions(params *chaincfg.Params) (map[string]uint32, er
return deploymentVers, nil
}

// TODO: Remove the chainQueryerAdapter when the indexManager gets nuked.

// chainQueryerAdapter provides an adapter from a BlockChain instance to the
// indexers.ChainQueryer interface.
type chainQueryerAdapter struct {
Expand Down Expand Up @@ -1973,6 +2053,10 @@ type Config struct {
// This field can be nil if the caller does not wish to make use of an
// index manager.
IndexManager indexers.IndexManager

// IndexubSubscriber defines a subscriber for relaying updates
// concerning connected and disconnected blocks to subscribed index clients.
IndexSubscriber *indexers.IndexSubscriber
}

// New returns a BlockChain instance using the provided configuration details.
Expand Down Expand Up @@ -2026,6 +2110,7 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) {
notifications: config.Notifications,
sigCache: config.SigCache,
indexManager: config.IndexManager,
indexSubscriber: config.IndexSubscriber,
subsidyCache: subsidyCache,
index: newBlockIndex(config.DB),
bestChain: newChainView(nil),
Expand Down
9 changes: 6 additions & 3 deletions cmd/addblock/import.go
Expand Up @@ -318,11 +318,14 @@ func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) {
} else {
log.Info("Transaction index is enabled")
}
indexes = append(indexes, indexers.NewTxIndex(db))

// TODO: figure out how the async index can be added for the block
// importer.
// indexes = append(indexes, indexers.NewTxIndex(db))
}
if cfg.AddrIndex {
log.Info("Address index is enabled")
indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams))
// log.Info("Address index is enabled")
// indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams))
}
if !cfg.NoExistsAddrIndex {
log.Info("Exists address index is enabled")
Expand Down
65 changes: 43 additions & 22 deletions server.go
Expand Up @@ -465,6 +465,7 @@ type server struct {
// if the associated index is not enabled. These fields are set during
// initial creation of the server and never changed afterwards, so they
// do not need to be protected for concurrent access.
indexSubscriber *indexers.IndexSubscriber
txIndex *indexers.TxIndex
addrIndex *indexers.AddrIndex
existsAddrIndex *indexers.ExistsAddrIndex
Expand Down Expand Up @@ -2886,6 +2887,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP
services: services,
sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
subsidyCache: standalone.NewSubsidyCache(chainParams),
indexSubscriber: indexers.NewIndexSubscriber(ctx),
}

// Create the transaction and address indexes if needed.
Expand All @@ -2895,25 +2897,6 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP
// addrindex is run first, it may not have the transactions from the
// current block indexed.
var indexes []indexers.Indexer
if cfg.TxIndex || cfg.AddrIndex {
// Enable transaction index if address index is enabled since it
// requires it.
if !cfg.TxIndex {
indxLog.Infof("Transaction index enabled because it " +
"is required by the address index")
cfg.TxIndex = true
} else {
indxLog.Info("Transaction index is enabled")
}

s.txIndex = indexers.NewTxIndex(db)
indexes = append(indexes, s.txIndex)
}
if cfg.AddrIndex {
indxLog.Info("Address index is enabled")
s.addrIndex = indexers.NewAddrIndex(db, chainParams)
indexes = append(indexes, s.addrIndex)
}
if !cfg.NoExistsAddrIndex {
indxLog.Info("Exists address index is enabled")
s.existsAddrIndex = indexers.NewExistsAddrIndex(db, chainParams)
Expand Down Expand Up @@ -2969,14 +2952,52 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP
s.blockManager.handleBlockchainNotification(notification)
}
},
SigCache: s.sigCache,
SubsidyCache: s.subsidyCache,
IndexManager: indexManager,
SigCache: s.sigCache,
SubsidyCache: s.subsidyCache,
IndexManager: indexManager,
IndexSubscriber: s.indexSubscriber,
})
if err != nil {
return nil, err
}

// TODO: Initialize all indexes after creating the chain, this should be
// done after porting over to the index subscriber.
if cfg.TxIndex || cfg.AddrIndex {
// Enable transaction index if address index is enabled since it
// requires it.
if !cfg.TxIndex {
indxLog.Infof("Transaction index enabled because it " +
"is required by the address index")
cfg.TxIndex = true
} else {
indxLog.Info("Transaction index is enabled")
}

sub, err := s.indexSubscriber.Subscribe(indexers.TxIndexName, "")
if err != nil {
return nil, err
}

s.txIndex, err = indexers.NewTxIndex(ctx, db, sub, s.chain, s.chainParams)
if err != nil {
return nil, err
}
}
if cfg.AddrIndex {
indxLog.Info("Address index is enabled")
sub, err := s.indexSubscriber.Subscribe(indexers.AddrIndexName,
indexers.TxIndexName)
if err != nil {
return nil, err
}

s.addrIndex, err = indexers.NewAddrIndex(ctx, db, sub, s.chain, s.chainParams)
if err != nil {
return nil, err
}
}

txC := mempool.Config{
Policy: mempool.Policy{
MaxTxVersion: 2,
Expand Down

0 comments on commit 107811f

Please sign in to comment.