Skip to content

Commit

Permalink
multi: copy snapshot pkScript.
Browse files Browse the repository at this point in the history
This refactors finding the lowest index tip height out of CatchUp into
findLowestIndexTipHeight. The prevscripter snapshot now makes copies of
the pkScript to avoid holding on to references too long during the
catch up process.
  • Loading branch information
dnldd authored and davecgh committed Oct 12, 2021
1 parent 66ef012 commit 6f41582
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 39 deletions.
10 changes: 9 additions & 1 deletion blockchain/chain.go
Expand Up @@ -346,9 +346,17 @@ func newPrevScriptSnapshot(view *UtxoViewpoint) *prevScriptsSnapshot {
snapshot.entries[k] = prevScript{}
continue
}

var pkScript []byte
scriptLen := len(v.pkScript)
if scriptLen != 0 {
pkScript = make([]byte, scriptLen)
copy(pkScript, v.pkScript)
}

snapshot.entries[k] = prevScript{
scriptVersion: v.scriptVersion,
pkScript: v.pkScript,
pkScript: pkScript,
}
}

Expand Down
60 changes: 33 additions & 27 deletions blockchain/indexers/common.go
Expand Up @@ -665,6 +665,13 @@ func upgradeIndex(ctx context.Context, indexer Indexer, genesisHash *chainhash.H
// maybeNotifySubscribers updates subscribers the index is synced when
// the tip is identical to the chain tip.
func maybeNotifySubscribers(ctx context.Context, indexer Indexer) error {
subs := indexer.Subscribers()

// Exit immediately if the index has no subscribers.
if len(subs) == 0 {
return nil
}

if interruptRequested(ctx) {
return errInterruptRequested
}
Expand All @@ -677,7 +684,6 @@ func maybeNotifySubscribers(ctx context.Context, indexer Indexer) error {
}

if tipHeight == bestHeight && *bestHash == *tipHash {
subs := indexer.Subscribers()
for sub := range subs {
close(sub)
delete(subs, sub)
Expand All @@ -702,21 +708,20 @@ func notifyDependent(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) erro

// Notify the dependent subscription if set.
sub.mtx.Lock()
defer sub.mtx.Unlock()
if sub.dependent != nil {
err := updateIndex(ctx, sub.dependent.idx, ntfn)
if err != nil {
sub.mtx.Unlock()
return err
}
}
sub.mtx.Unlock()

return nil
}

// updateIndex processes the notification for the provided index.
func updateIndex(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) error {
// Ensure the incoming index notification is of the next block
// extending the chain.
tip, _, err := indexer.Tip()
if err != nil {
return fmt.Errorf("%s: unable to fetch index tip: %v",
Expand All @@ -734,38 +739,39 @@ func updateIndex(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) error {
indexer.Name(), ntfn.NtfnType)
}

// Relay the notification to the dependent if its height is less than that
// of the expected notification since its possible for a dependent to have
// a lower tip height than its prerequisite.
if ntfn.Block.Height() < expectedHeight {
switch {
case ntfn.Block.Height() < expectedHeight:
// Relay the notification to the dependent if its height is less
// than that of the expected notification since its possible for a
// dependent to have a lower tip height than its prerequisite.
log.Tracef("%s: relaying notification for height %d to dependent",
indexer.Name(), ntfn.Block.Height())
return notifyDependent(ctx, indexer, ntfn)
}
notifyDependent(ctx, indexer, ntfn)

// Receiving a notification with a height higher than the expected implies
// a missed index update.
if ntfn.Block.Height() > expectedHeight {
case ntfn.Block.Height() > expectedHeight:
// Receiving a notification with a height higher than the expected
// implies a missed index update.
return fmt.Errorf("%s: missing index notification, expected "+
"notification for height %d, got %d", indexer.Name(),
expectedHeight, ntfn.Block.Height())
}

err = indexer.DB().Update(func(dbTx database.Tx) error {
return indexer.ProcessNotification(dbTx, ntfn)
})
if err != nil {
return err
}
default:
err = indexer.DB().Update(func(dbTx database.Tx) error {
return indexer.ProcessNotification(dbTx, ntfn)
})
if err != nil {
return err
}

err = notifyDependent(ctx, indexer, ntfn)
if err != nil {
return err
}
err = notifyDependent(ctx, indexer, ntfn)
if err != nil {
return err
}

err = maybeNotifySubscribers(ctx, indexer)
if err != nil {
return err
err = maybeNotifySubscribers(ctx, indexer)
if err != nil {
return err
}
}

return nil
Expand Down
32 changes: 21 additions & 11 deletions blockchain/indexers/indexsubscriber.go
Expand Up @@ -192,25 +192,22 @@ func (s *IndexSubscriber) Notify(ntfn *IndexNtfn) {
}
}

// CatchUp syncs all subscribed indexes to the the main chain by
// connecting blocks from the after the lowest index tip to the current main
// chain tip.
//
// This should be called after all indexes have subscribed for updates.
func (s *IndexSubscriber) CatchUp(ctx context.Context, db database.DB, queryer ChainQueryer) error {
// Find the lowest tip height to catch up from among subscribed indexes.
// findLowestIndexTipHeight determines the lowest index tip height among
// subscribed indexes and their dependencies.
func (s *IndexSubscriber) findLowestIndexTipHeight(queryer ChainQueryer) (int64, int64, error) {
// Find the lowest tip height to catch up among subscribed indexes.
bestHeight, _ := queryer.Best()
lowestHeight := bestHeight
for _, sub := range s.subscriptions {
tipHeight, tipHash, err := sub.idx.Tip()
if err != nil {
return err
return 0, bestHeight, err
}

// Ensure the index tip is on the main chain.
if !queryer.MainChainHasBlock(tipHash) {
return fmt.Errorf("%s: index tip (%s) is not on the main chain",
sub.idx.Name(), tipHash)
return 0, bestHeight, fmt.Errorf("%s: index tip (%s) is not on the "+
"main chain", sub.idx.Name(), tipHash)
}

if tipHeight < lowestHeight {
Expand All @@ -222,7 +219,7 @@ func (s *IndexSubscriber) CatchUp(ctx context.Context, db database.DB, queryer C
for dependent != nil {
tipHeight, _, err := sub.dependent.idx.Tip()
if err != nil {
return err
return 0, bestHeight, err
}

if tipHeight < lowestHeight {
Expand All @@ -233,6 +230,19 @@ func (s *IndexSubscriber) CatchUp(ctx context.Context, db database.DB, queryer C
}
}

return lowestHeight, bestHeight, nil
}

// CatchUp syncs all subscribed indexes to the the main chain by connecting
// blocks from after the lowest index tip to the current main chain tip.
//
// This should be called after all indexes have subscribed for updates.
func (s *IndexSubscriber) CatchUp(ctx context.Context, db database.DB, queryer ChainQueryer) error {
lowestHeight, bestHeight, err := s.findLowestIndexTipHeight(queryer)
if err != nil {
return err
}

// Nothing to do if all indexes are synced.
if bestHeight == lowestHeight {
return nil
Expand Down

0 comments on commit 6f41582

Please sign in to comment.