Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexers: asynchronous index updates. #2219

Merged
merged 12 commits into from Oct 12, 2021
194 changes: 149 additions & 45 deletions blockchain/chain.go
Expand Up @@ -144,7 +144,7 @@ type BlockChain struct {
timeSource MedianTimeSource
notifications NotificationCallback
sigCache *txscript.SigCache
indexManager indexers.IndexManager
indexSubscriber *indexers.IndexSubscriber
interrupt <-chan struct{}
utxoCache UtxoCacher

Expand Down Expand Up @@ -317,6 +317,64 @@ type VoteInfo struct {
AgendaStatus []ThresholdStateTuple
}

// prevScript represents script and script version information for a previous
// outpoint.
type prevScript struct {
scriptVersion uint16
pkScript []byte
}

// prevScriptsSnapshot represents a snapshot of script and script version
// information related to previous outpoints from a utxo viewpoint.
//
// This implements the indexers.PrevScripter interface.
type prevScriptsSnapshot struct {
entries map[wire.OutPoint]prevScript
}

// Ensure prevScriptSnapshot implements the indexers.PrevScripter interface.
var _ indexers.PrevScripter = (*prevScriptsSnapshot)(nil)

// newPrevScriptSnapshot creates a script and script version snapshot from
// the provided utxo viewpoint.
func newPrevScriptSnapshot(view *UtxoViewpoint) *prevScriptsSnapshot {
snapshot := &prevScriptsSnapshot{
entries: make(map[wire.OutPoint]prevScript, len(view.entries)),
}
for k, v := range view.entries {
if v == nil {
snapshot.entries[k] = prevScript{}
continue
}

var pkScript []byte
scriptLen := len(v.pkScript)
if scriptLen != 0 {
pkScript = make([]byte, scriptLen)
copy(pkScript, v.pkScript)
}

snapshot.entries[k] = prevScript{
scriptVersion: v.scriptVersion,
pkScript: pkScript,
}
}
dnldd marked this conversation as resolved.
Show resolved Hide resolved

return snapshot
}

// PrevScript returns the script and script version associated with the provided
// previous outpoint along with a bool that indicates whether or not the
// requested entry exists. This ensures the caller is able to distinguish
// between missing entries and empty v0 scripts.
func (p *prevScriptsSnapshot) PrevScript(prevOut *wire.OutPoint) (uint16, []byte, bool) {
entry := p.entries[*prevOut]
if entry.pkScript == nil {
return 0, nil, false
}
return entry.scriptVersion, entry.pkScript, true
}

dnldd marked this conversation as resolved.
Show resolved Hide resolved
// GetVoteInfo returns information on consensus deployment agendas and their
// respective states at the provided hash, for the provided deployment version.
func (b *BlockChain) GetVoteInfo(hash *chainhash.Hash, version uint32) (*VoteInfo, error) {
Expand Down Expand Up @@ -658,23 +716,17 @@ func (b *BlockChain) connectBlock(node *blockNode, block, parent *dcrutil.Block,
return err
}

// Allow the index manager to call each of the currently active
// optional indexes with the block being connected so they can
// update themselves accordingly.
if b.indexManager != nil {
err := b.indexManager.ConnectBlock(dbTx, block, parent,
view, isTreasuryEnabled)
if err != nil {
return err
}
}

return nil
})
if err != nil {
return err
}

// This creates a prevScript snapshot of the utxo viewpoint for index updates.
// This is intentionally being done before the view is committed to the utxo
// cache since the caching process mutates the view by removing entries.
prevScripter := newPrevScriptSnapshot(view)
dnldd marked this conversation as resolved.
Show resolved Hide resolved

// Commit all entries in the view to the utxo cache. All entries in the view
// that are marked as modified and spent are removed from the view.
// Additionally, all entries that are added to the cache are removed from the
Expand All @@ -696,6 +748,18 @@ func (b *BlockChain) connectBlock(node *blockNode, block, parent *dcrutil.Block,
return err
}

// Notify subscribed indexes of the connected block.
if b.indexSubscriber != nil {
b.indexSubscriber.Notify(&indexers.IndexNtfn{
NtfnType: indexers.ConnectNtfn,
Block: block,
Parent: parent,
PrevScripts: prevScripter,
IsTreasuryEnabled: isTreasuryEnabled,
Done: make(chan bool),
})
}

// This node is now the end of the best chain.
b.bestChain.SetTip(node)
b.index.MaybePruneCachedTips(node)
Expand Down Expand Up @@ -846,23 +910,17 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block, parent *dcrutil.Blo
// ensure that lightweight clients still have access to them if they
// happen to be on a side chain after coming back online after a reorg.

// Allow the index manager to call each of the currently active
// optional indexes with the block being disconnected so they
// can update themselves accordingly.
if b.indexManager != nil {
err := b.indexManager.DisconnectBlock(dbTx, block,
parent, view, isTreasuryEnabled)
if err != nil {
return err
}
}

return nil
})
if err != nil {
return err
}

// This creates a prevScript snapshot of the utxo viewpoint for index updates.
// This is intentionally being done before the view is committed to the utxo
// cache since the caching process mutates the view by removing entries.
prevScripter := newPrevScriptSnapshot(view)

// Commit all entries in the view to the utxo cache. All entries in the view
// that are marked as modified and spent are removed from the view.
// Additionally, all entries that are added to the cache are removed from the
Expand All @@ -888,6 +946,18 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block, parent *dcrutil.Blo
return err
}

// Notify subscribed indexes of the disconnected block.
if b.indexSubscriber != nil {
b.indexSubscriber.Notify(&indexers.IndexNtfn{
NtfnType: indexers.DisconnectNtfn,
Block: block,
Parent: parent,
PrevScripts: prevScripter,
IsTreasuryEnabled: isTreasuryEnabled,
Done: make(chan bool),
})
}

// This node's parent is now the end of the best chain.
b.bestChain.SetTip(node.parent)

Expand Down Expand Up @@ -2085,9 +2155,9 @@ func stxosToScriptSource(block *dcrutil.Block, stxos []spentTxOut, isTreasuryEna
return source
}

// chainQueryerAdapter provides an adapter from a BlockChain instance to the
// ChainQueryerAdapter provides an adapter from a BlockChain instance to the
// indexers.ChainQueryer interface.
type chainQueryerAdapter struct {
type ChainQueryerAdapter struct {
*BlockChain
}

Expand All @@ -2097,13 +2167,13 @@ type chainQueryerAdapter struct {
//
// It is defined via a separate internal struct to avoid polluting the public
// API of the BlockChain type itself.
func (q *chainQueryerAdapter) BestHeight() int64 {
func (q *ChainQueryerAdapter) BestHeight() int64 {
return q.BestSnapshot().Height
}

// IsTreasuryEnabled returns true if the treasury agenda is enabled as of the
// provided block.
func (q *chainQueryerAdapter) IsTreasuryEnabled(hash *chainhash.Hash) (bool, error) {
func (q *ChainQueryerAdapter) IsTreasuryEnabled(hash *chainhash.Hash) (bool, error) {
return q.IsTreasuryAgendaActive(hash)
}

Expand All @@ -2114,7 +2184,7 @@ func (q *chainQueryerAdapter) IsTreasuryEnabled(hash *chainhash.Hash) (bool, err
// API of the BlockChain type itself.
davecgh marked this conversation as resolved.
Show resolved Hide resolved
//
// This is part of the indexers.ChainQueryer interface.
func (q *chainQueryerAdapter) PrevScripts(dbTx database.Tx, block *dcrutil.Block) (indexers.PrevScripter, error) {
func (q *ChainQueryerAdapter) PrevScripts(dbTx database.Tx, block *dcrutil.Block) (indexers.PrevScripter, error) {
prevHash := &block.MsgBlock().Header.PrevBlock
isTreasuryEnabled, err := q.IsTreasuryAgendaActive(prevHash)
if err != nil {
Expand Down Expand Up @@ -2152,6 +2222,53 @@ func (b *BlockChain) RemoveSpendEntry(hash *chainhash.Hash) error {
return err
}

// ChainParams returns the network parameters of the chain.
//
// This is part of the indexers.ChainQueryer interface.
func (q *ChainQueryerAdapter) ChainParams() *chaincfg.Params {
return q.chainParams
}

// Best returns the height and hash of the current best chain tip.
//
// This is part of the indexers.ChainQueryer interface.
func (q *ChainQueryerAdapter) Best() (int64, *chainhash.Hash) {
snapshot := q.BestSnapshot()
return snapshot.Height, &snapshot.Hash
}

// Ancestor returns the ancestor of the provided block at the provided height.
//
// This function is safe for concurrent access and is part of the
// indexers.ChainQueryer interface.
func (q *ChainQueryerAdapter) Ancestor(block *chainhash.Hash, height int64) *chainhash.Hash {
node := q.index.LookupNode(block)
ancestor := node.Ancestor(height)
return &ancestor.hash
}

// AddSpendConsumer adds the provided spend consumer to the spend pruner.
func (q *ChainQueryerAdapter) AddSpendConsumer(consumer spendpruner.SpendConsumer) {
q.spendPruner.AddConsumer(consumer)
}

// RemoveSpendConsumerDependency removes the provided spend consumer dependency
// associated with the provided block hash from the spend pruner.
func (q *ChainQueryerAdapter) RemoveSpendConsumerDependency(dbTx database.Tx, blockHash *chainhash.Hash, consumerID string) error {
return q.spendPruner.RemoveSpendConsumerDependency(dbTx, blockHash, consumerID)
}

// FetchSpendConsumer returns the spend journal consumer associated with the
// provided id.
func (q *ChainQueryerAdapter) FetchSpendConsumer(id string) (spendpruner.SpendConsumer, error) {
return q.spendPruner.FetchConsumer(id)
}

// BlockHeaderByHash returns the block header identified by the given hash.
func (q *ChainQueryerAdapter) BlockHeaderByHash(hash *chainhash.Hash) (wire.BlockHeader, error) {
return q.HeaderByHash(hash)
}

// SpendPrunerHandler processes incoming spending pruner signals.
//
// This must be run as a goroutine.
Expand Down Expand Up @@ -2220,12 +2337,9 @@ type Config struct {
// subsidy cache.
SubsidyCache *standalone.SubsidyCache

// IndexManager defines an index manager to use when initializing the
// chain and connecting and disconnecting blocks.
//
// This field can be nil if the caller does not wish to make use of an
// index manager.
IndexManager indexers.IndexManager
// IndexSubscriber defines a subscriber for relaying updates
// concerning connected and disconnected blocks to subscribed index clients.
IndexSubscriber *indexers.IndexSubscriber

// UtxoCache defines a utxo cache that sits on top of the utxo set database.
// All utxo reads and writes go through the cache, and never read or write to
Expand Down Expand Up @@ -2288,8 +2402,8 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) {
timeSource: config.TimeSource,
notifications: config.Notifications,
sigCache: config.SigCache,
indexManager: config.IndexManager,
interrupt: ctx.Done(),
indexSubscriber: config.IndexSubscriber,
subsidyCache: subsidyCache,
index: newBlockIndex(config.DB),
bestChain: newChainView(nil),
Expand Down Expand Up @@ -2318,16 +2432,6 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) {
return nil, err
}

// Initialize and catch up all of the currently active optional indexes
// as needed.
queryAdapter := chainQueryerAdapter{BlockChain: &b}
if config.IndexManager != nil {
err := config.IndexManager.Init(ctx, &queryAdapter)
if err != nil {
return nil, err
}
}

b.spendPruner, err = spendpruner.NewSpendJournalPruner(b.db, b.RemoveSpendEntry)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion blockchain/error_test.go
Expand Up @@ -327,7 +327,7 @@ func TestErrorKindIsAs(t *testing.T) {
continue
}

// Ensure the underlying error kind can be unwrapped is and is the
// Ensure the underlying error kind can be unwrapped and is the
// expected kind.
var kind ErrorKind
if !errors.As(test.err, &kind) {
Expand Down