Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion graft/coreth/plugin/evm/atomic/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
var (
_ sync.Syncer = (*Syncer)(nil)
_ syncclient.LeafSyncTask = (*syncerLeafTask)(nil)
_ sync.Finalizer = (*Syncer)(nil)

errTargetHeightRequired = errors.New("target height must be > 0")
)
Expand Down Expand Up @@ -125,7 +126,6 @@ func NewSyncer(client syncclient.LeafClient, db *versiondb.Database, atomicTrie
syncer.syncer = syncclient.NewCallbackLeafSyncer(client, tasks, &syncclient.LeafSyncerConfig{
RequestSize: cfg.requestSize,
NumWorkers: cfg.numWorkers,
OnFailure: func() {}, // No-op since we flush progress to disk at the regular commit interval.
})

return syncer, nil
Expand All @@ -146,6 +146,13 @@ func (s *Syncer) Sync(ctx context.Context) error {
return s.syncer.Sync(ctx)
}

// Finalize commits any pending database changes to disk.
// This ensures that even if the sync is cancelled or fails, we preserve
// the progress up to the last fully synced height.
func (s *Syncer) Finalize() error {
return s.db.Commit()
}

// addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes.
func addZeroes(height uint64) []byte {
// Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the
Expand Down
16 changes: 16 additions & 0 deletions graft/coreth/plugin/evm/vmsync/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (r *SyncerRegistry) Register(syncer syncpkg.Syncer) error {

// RunSyncerTasks executes all registered syncers synchronously.
func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syncable) error {
// Ensure finalization runs regardless of how this function exits.
// This guarantees cleanup even on early returns or panics.
defer r.FinalizeAll(summary)

// Early return if context is already canceled (e.g., during shutdown).
if err := ctx.Err(); err != nil {
return err
Expand Down Expand Up @@ -102,3 +106,15 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl

return g
}

// FinalizeAll iterates over all registered syncers and calls Finalize on those that implement the Finalizer interface.
// Errors are logged but not returned to ensure best-effort cleanup of all syncers.
func (r *SyncerRegistry) FinalizeAll(summary message.Syncable) {
for _, task := range r.syncers {
if f, ok := task.syncer.(syncpkg.Finalizer); ok {
if err := f.Finalize(); err != nil {
log.Error("failed to finalize syncer", "syncer", task.name, "err", err, "summary", summary.GetBlockHash().Hex(), "height", summary.Height())
}
}
}
}
9 changes: 4 additions & 5 deletions graft/coreth/sync/client/leaf_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type LeafSyncTask interface {
type LeafSyncerConfig struct {
RequestSize uint16 // Number of leafs to request from a peer at a time
NumWorkers int // Number of workers to process leaf sync tasks
OnFailure func() // Callback for handling errors during sync
}

type CallbackLeafSyncer struct {
Expand Down Expand Up @@ -159,9 +158,9 @@ func (c *CallbackLeafSyncer) Sync(ctx context.Context) error {
})
}

err := eg.Wait()
if err != nil {
c.config.OnFailure()
if err := eg.Wait(); err != nil {
return err
}
return err

return nil
}
4 changes: 4 additions & 0 deletions graft/coreth/sync/statesync/code_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ import (
"github.com/ava-labs/libevm/libevm/options"

"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/customrawdb"

syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync"
)

const defaultQueueCapacity = 5000

var (
_ syncpkg.Finalizer = (*CodeQueue)(nil)

errFailedToAddCodeHashesToQueue = errors.New("failed to add code hashes to queue")
errFailedToFinalizeCodeQueue = errors.New("failed to finalize code queue")
)
Expand Down
21 changes: 14 additions & 7 deletions graft/coreth/sync/statesync/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, co
ss.syncer = syncclient.NewCallbackLeafSyncer(client, ss.segments, &syncclient.LeafSyncerConfig{
RequestSize: leafsRequestSize,
NumWorkers: defaultNumWorkers,
OnFailure: ss.onSyncFailure,
})

if codeQueue == nil {
Expand Down Expand Up @@ -301,19 +300,27 @@ func (t *stateSync) removeTrieInProgress(root common.Hash) (int, error) {
return len(t.triesInProgress), nil
}

// onSyncFailure is called if the sync fails, this writes all
// batches of in-progress trie segments to disk to have maximum
// progress to restore.
func (t *stateSync) onSyncFailure() {
// Finalize checks if there are any in-progress tries and flushes their batches to disk
// to preserve progress. On successful sync completion, triesInProgress will be empty
// and this is effectively a no-op. This is called by the syncer registry regardless
// of sync success or failure.
func (t *stateSync) Finalize() error {
t.lock.RLock()
defer t.lock.RUnlock()

// Nothing to flush if no tries are in progress.
// This is the case after a successful sync.
if len(t.triesInProgress) == 0 {
return nil
}

for _, trie := range t.triesInProgress {
for _, segment := range trie.segments {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(could be an over-cautious comment)
this looks like should've been cleared on success, but if not then we might end-up in a weird spot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check my previous reply #4623 (comment)

if err := segment.batch.Write(); err != nil {
log.Error("failed to write segment batch on sync failure", "err", err)
return
log.Error("failed to write segment batch on finalize", "err", err)
return err
}
}
}
return nil
}
7 changes: 7 additions & 0 deletions graft/coreth/sync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type Syncer interface {
ID() string
}

// Finalizer provides a mechanism to perform cleanup operations after a sync operation.
// This is useful for handling inflight requests, flushing to disk, or other cleanup tasks.
type Finalizer interface {
// Finalize performs any necessary cleanup operations.
Finalize() error
}

// SummaryProvider is an interface for providing state summaries.
type SummaryProvider interface {
StateSummaryAtBlock(ethBlock *types.Block) (block.StateSummary, error)
Expand Down