-
Notifications
You must be signed in to change notification settings - Fork 639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement interval tree to replace bootstrapping jobs queue #2756
Merged
Merged
Changes from 19 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
c5b7aa9
Implement interval tree for syncing
StephenButtolph 63efd3b
Persist intervals
StephenButtolph 2454c46
rename
StephenButtolph 0efa5eb
Add block execution
StephenButtolph 6a65a2a
Add length + TODOs
StephenButtolph f2234bd
nit
StephenButtolph 1f63238
error rather than panic
StephenButtolph 6947876
Refactor database passing
StephenButtolph be64296
Add interval tests
StephenButtolph 884f39c
Fix too frequent iterator releases and batch writes
StephenButtolph e4e92cc
nit + support cancellation
StephenButtolph 72314d1
Add invariant comment + tests
StephenButtolph 8851470
Merge branch 'master' into interval-tree-syncing
StephenButtolph 03f60c6
lint
StephenButtolph f01b6a3
allow specifying the log level
StephenButtolph d5da1e9
upstream
StephenButtolph 25e2801
cleanup
StephenButtolph f0c5c10
Merge branch 'master' into interval-tree-syncing
StephenButtolph b6d6ba9
Merge branch 'master' into interval-tree-syncing
StephenButtolph 06fd656
Merge branch 'master' into interval-tree-syncing
StephenButtolph c89d24d
Remove redundent interface
StephenButtolph bc3fedf
nit
StephenButtolph 8134e16
nit
StephenButtolph 9991e39
Move GetMissingBlockIDs and Execute out of the interval package
StephenButtolph 2877502
Merge branch 'master' into interval-tree-syncing
StephenButtolph f1dd0a6
nit
StephenButtolph bf29d86
remove storage
StephenButtolph 5a8fadc
nit
StephenButtolph 69ed897
nit
StephenButtolph 9815047
nit
StephenButtolph 5bf4899
nit
StephenButtolph 9da3bc1
nit
StephenButtolph ccc8a29
remove randomness
StephenButtolph 7ab2cb4
nit
StephenButtolph e19016f
remove dead code
StephenButtolph d7f4861
nit
joshua-kim File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,237 @@ | ||
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package interval | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/ava-labs/avalanchego/database" | ||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/snow/consensus/snowman" | ||
"github.com/ava-labs/avalanchego/utils/logging" | ||
"github.com/ava-labs/avalanchego/utils/set" | ||
"github.com/ava-labs/avalanchego/utils/timer" | ||
) | ||
|
||
const ( | ||
batchWritePeriod = 64 | ||
iteratorReleasePeriod = 1024 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be increased? |
||
logPeriod = 5 * time.Second | ||
) | ||
|
||
type Parser interface { | ||
ParseBlock(context.Context, []byte) (snowman.Block, error) | ||
} | ||
|
||
// GetMissingBlockIDs returns the ID of the blocks that should be fetched to | ||
// attempt to make a single continuous range from | ||
// (lastAcceptedHeight, highestTrackedHeight]. | ||
// | ||
// For example, if the tree currently contains heights [1, 4, 6, 7] and the | ||
// lastAcceptedHeight is 2, this function will return the IDs corresponding to | ||
// blocks [3, 5]. | ||
func GetMissingBlockIDs( | ||
StephenButtolph marked this conversation as resolved.
Show resolved
Hide resolved
StephenButtolph marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ctx context.Context, | ||
db database.KeyValueReader, | ||
parser Parser, | ||
tree *Tree, | ||
lastAcceptedHeight uint64, | ||
) (set.Set[ids.ID], error) { | ||
var ( | ||
missingBlocks set.Set[ids.ID] | ||
intervals = tree.Flatten() | ||
lastHeightToFetch = lastAcceptedHeight + 1 | ||
) | ||
for _, i := range intervals { | ||
if i.LowerBound <= lastHeightToFetch { | ||
continue | ||
} | ||
|
||
blkBytes, err := GetBlock(db, i.LowerBound) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
blk, err := parser.ParseBlock(ctx, blkBytes) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
parentID := blk.Parent() | ||
missingBlocks.Add(parentID) | ||
} | ||
return missingBlocks, nil | ||
} | ||
|
||
// Add the block to the tree and return if the parent block should be fetched, | ||
// but wasn't desired before. | ||
func Add( | ||
db database.KeyValueWriterDeleter, | ||
tree *Tree, | ||
lastAcceptedHeight uint64, | ||
blk snowman.Block, | ||
) (bool, error) { | ||
var ( | ||
height = blk.Height() | ||
lastHeightToFetch = lastAcceptedHeight + 1 | ||
) | ||
if height < lastHeightToFetch || tree.Contains(height) { | ||
return false, nil | ||
} | ||
|
||
blkBytes := blk.Bytes() | ||
if err := PutBlock(db, height, blkBytes); err != nil { | ||
return false, err | ||
} | ||
|
||
if err := tree.Add(db, height); err != nil { | ||
return false, err | ||
} | ||
|
||
return height != lastHeightToFetch && !tree.Contains(height-1), nil | ||
} | ||
|
||
// Execute all the blocks tracked by the tree. If a block is in the tree but is | ||
// already accepted based on the lastAcceptedHeight, it will be removed from the | ||
// tree but not executed. | ||
// | ||
// Execute assumes that GetMissingBlockIDs would return an empty set. | ||
func Execute( | ||
StephenButtolph marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ctx context.Context, | ||
log logging.Func, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking in the log function rather than the full logger allows the caller to specify the log level. |
||
db database.Database, | ||
parser Parser, | ||
tree *Tree, | ||
lastAcceptedHeight uint64, | ||
) error { | ||
var ( | ||
batch = db.NewBatch() | ||
processedSinceBatchWrite uint | ||
writeBatch = func() error { | ||
if processedSinceBatchWrite == 0 { | ||
return nil | ||
} | ||
processedSinceBatchWrite = 0 | ||
|
||
if err := batch.Write(); err != nil { | ||
return err | ||
} | ||
batch.Reset() | ||
return nil | ||
} | ||
|
||
iterator = db.NewIteratorWithPrefix(blockPrefix) | ||
processedSinceIteratorRelease uint | ||
|
||
startTime = time.Now() | ||
timeOfNextLog = startTime.Add(logPeriod) | ||
totalNumberToProcess = tree.Len() | ||
) | ||
defer func() { | ||
iterator.Release() | ||
}() | ||
|
||
log("executing blocks", | ||
zap.Uint64("numToExecute", totalNumberToProcess), | ||
) | ||
|
||
for ctx.Err() == nil && iterator.Next() { | ||
blkBytes := iterator.Value() | ||
blk, err := parser.ParseBlock(ctx, blkBytes) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
height := blk.Height() | ||
if err := DeleteBlock(batch, height); err != nil { | ||
return err | ||
} | ||
|
||
if err := tree.Remove(batch, height); err != nil { | ||
return err | ||
} | ||
|
||
// Periodically write the batch to disk to avoid memory pressure. | ||
processedSinceBatchWrite++ | ||
if processedSinceBatchWrite >= batchWritePeriod { | ||
if err := writeBatch(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// Periodically release and re-grab the database iterator to avoid | ||
// keeping a reference to an old database revision. | ||
processedSinceIteratorRelease++ | ||
if processedSinceIteratorRelease >= iteratorReleasePeriod { | ||
if err := iterator.Error(); err != nil { | ||
return err | ||
} | ||
|
||
// The batch must be written here to avoid re-processing a block. | ||
if err := writeBatch(); err != nil { | ||
return err | ||
} | ||
|
||
processedSinceIteratorRelease = 0 | ||
iterator.Release() | ||
iterator = db.NewIteratorWithPrefix(blockPrefix) | ||
} | ||
|
||
now := time.Now() | ||
if now.After(timeOfNextLog) { | ||
var ( | ||
numProcessed = totalNumberToProcess - tree.Len() | ||
eta = timer.EstimateETA(startTime, numProcessed, totalNumberToProcess) | ||
) | ||
|
||
log("executing blocks", | ||
zap.Duration("eta", eta), | ||
zap.Uint64("numExecuted", numProcessed), | ||
zap.Uint64("numToExecute", totalNumberToProcess), | ||
) | ||
timeOfNextLog = now.Add(logPeriod) | ||
} | ||
|
||
if height <= lastAcceptedHeight { | ||
continue | ||
} | ||
|
||
if err := blk.Verify(ctx); err != nil { | ||
return fmt.Errorf("failed to verify block %s (%d) in bootstrapping: %w", | ||
blk.ID(), | ||
height, | ||
err, | ||
) | ||
} | ||
if err := blk.Accept(ctx); err != nil { | ||
return fmt.Errorf("failed to accept block %s (%d) in bootstrapping: %w", | ||
blk.ID(), | ||
height, | ||
err, | ||
) | ||
} | ||
} | ||
if err := writeBatch(); err != nil { | ||
return err | ||
} | ||
if err := iterator.Error(); err != nil { | ||
return err | ||
} | ||
|
||
var ( | ||
numProcessed = totalNumberToProcess - tree.Len() | ||
err = ctx.Err() | ||
) | ||
log("executed blocks", | ||
zap.Uint64("numExecuted", numProcessed), | ||
zap.Uint64("numToExecute", totalNumberToProcess), | ||
zap.Duration("duration", time.Since(startTime)), | ||
zap.Error(err), | ||
) | ||
return err | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be increased?