Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchup: pause catchup if ledger lagging behind #5794

Merged
merged 13 commits into from
Oct 26, 2023
29 changes: 21 additions & 8 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
Block(basics.Round) (bookkeeping.Block, error)
BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
IsWritingCatchpointDataFile() bool
IsBehindCommittingDeltas() bool
Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
WaitMem(r basics.Round) chan struct{}
Expand All @@ -86,10 +87,10 @@
deadlineTimeout time.Duration
blockValidationPool execpool.BacklogPool

// suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file or flushing accounts. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// and resume from there without stopping the catchup timer.
suspendForCatchpointWriting bool
suspendForLedgerOps bool

// The channel gets closed when the initial sync is complete. This allows for other services to avoid
// the overhead of starting prematurely (before this node is caught-up and can validate messages for example).
Expand Down Expand Up @@ -498,7 +499,14 @@
// could resume with the catchup.
if s.ledger.IsWritingCatchpointDataFile() {
s.log.Info("Catchup is stopping due to catchpoint file being written")
s.suspendForCatchpointWriting = true
s.suspendForLedgerOps = true
return

Check warning on line 503 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L502-L503

Added lines #L502 - L503 were not covered by tests
}

// if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure.
if s.ledger.IsBehindCommittingDeltas() {
s.log.Info("Catchup is stopping due to too many non-flushed account changes")
s.suspendForLedgerOps = true
return
}

Expand Down Expand Up @@ -555,10 +563,10 @@
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() {
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() {
continue
}
s.suspendForCatchpointWriting = false
s.suspendForLedgerOps = false
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
Expand All @@ -575,7 +583,12 @@
// keep the existing sleep duration and try again later.
continue
}
s.suspendForCatchpointWriting = false
// if the ledger has too many non-flushed account changes, skip
if s.ledger.IsBehindCommittingDeltas() {
continue

Check warning on line 588 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L588

Added line #L588 was not covered by tests
}

s.suspendForLedgerOps = false
s.log.Info("It's been too long since our ledger advanced; resyncing")
s.sync()
case cert := <-s.unmatchedPendingCertificates:
Expand Down Expand Up @@ -630,7 +643,7 @@
initSync := false

// if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file.
if !s.suspendForCatchpointWriting {
if !s.suspendForLedgerOps {
// in that case, don't change the timer so that the "timer" would keep running.
atomic.StoreInt64(&s.syncStartNS, 0)

Expand Down
56 changes: 56 additions & 0 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,18 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool {
return false
}

func (m *mockedLedger) IsBehindCommittingDeltas() bool {
return false
}

type mockedBehindDeltasLedger struct {
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved
mockedLedger
}

func (m *mockedBehindDeltasLedger) IsBehindCommittingDeltas() bool {
return true
}

func testingenvWithUpgrade(
t testing.TB,
numBlocks,
Expand Down Expand Up @@ -1127,3 +1139,47 @@ func TestDownloadBlocksToSupportStateProofs(t *testing.T) {
lookback = lookbackForStateproofsSupport(&topBlk)
assert.Equal(t, uint64(0), lookback)
}

// TestServiceLedgerUnavailable checks a local ledger that is unavailable cannot catchup up to remote round
func TestServiceLedgerUnavailable(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedBehindDeltasLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second

s.testStart()
defer s.Stop()
s.sync()
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}
37 changes: 1 addition & 36 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {

// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
type blockingTracker struct {
emptyTracker
postCommitUnlockedEntryLock chan struct{}
postCommitUnlockedReleaseLock chan struct{}
postCommitEntryLock chan struct{}
Expand All @@ -783,36 +784,12 @@ type blockingTracker struct {
shouldLockPostCommitUnlocked atomic.Bool
}

// loadFromDisk is not implemented in the blockingTracker.
func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
return nil
}

// newBlock is not implemented in the blockingTracker.
func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
}

// committedUpTo in the blockingTracker just stores the committed round.
func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd))
return committedRnd, basics.Round(0)
}

// produceCommittingTask is not used by the blockingTracker
func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}

// prepareCommit, is not used by the blockingTracker
func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
return nil
}

// commitRound is not used by the blockingTracker
func (bt *blockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error {
return nil
}

// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
if bt.alwaysLock.Load() || dcc.catchpointFirstStage || bt.shouldLockPostCommit.Load() {
Expand All @@ -829,18 +806,6 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// control functions are not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) {
}

// close is not used by the blockingTracker
func (bt *blockingTracker) close() {
}

func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down
6 changes: 6 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,12 @@
return l.trackers.getDbRound()
}

// IsBehindCommittingDeltas indicates if the ledger is behind expected number of in-memory deltas.
// It intended to slow down the catchup service when deltas overgrow some limit.
func (l *Ledger) IsBehindCommittingDeltas() bool {
return l.trackers.isBehindCommittingDeltas(l.Latest())

Check warning on line 904 in ledger/ledger.go

View check run for this annotation

Codecov / codecov/patch

ledger/ledger.go#L903-L904

Added lines #L903 - L904 were not covered by tests
}

// DebuggerLedger defines the minimal set of method required for creating a debug balances.
type DebuggerLedger = eval.LedgerForCowBase

Expand Down
78 changes: 57 additions & 21 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-algorand/config"
Expand Down Expand Up @@ -175,6 +176,8 @@

// accountsWriting provides synchronization around the background writing of account balances.
accountsWriting sync.WaitGroup
// accountsCommitting is set when trackers registry writing accounts into DB.
accountsCommitting atomic.Bool

// dbRound is always exactly accountsRound(),
// cached to avoid SQL queries.
Expand All @@ -196,8 +199,16 @@
lastFlushTime time.Time

cfg config.Local

// maxAccountDeltas is a maximum number of in-memory deltas stored by trackers.
// When exceeded trackerRegistry will attempt to flush, and its Available() method will return false.
// Too many in-memory deltas could cause the node to run out of memory.
maxAccountDeltas uint64
}

// defaultMaxAccountDeltas is a default value for maxAccountDeltas.
const defaultMaxAccountDeltas = 256

// deferredCommitRange is used during the calls to produceCommittingTask, and used as a data structure
// to syncronize the various trackers and create a uniformity around which rounds need to be persisted
// next.
Expand Down Expand Up @@ -285,26 +296,18 @@
return dcc.oldBase + basics.Round(dcc.offset)
}

var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker")
var errMissingAccountUpdateTracker = errors.New("trackers replay : called without a valid accounts update tracker")

func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) {
tr.mu.Lock()
defer tr.mu.Unlock()
tr.dbs = l.trackerDB()
tr.log = l.trackerLog()

err = tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) {
ar, err := tx.MakeAccountsReader()
if err != nil {
return err
}

tr.dbRound, err = ar.AccountsRound()
return err
})

if err != nil {
return err
tr.maxAccountDeltas = defaultMaxAccountDeltas
if cfg.MaxAcctLookback > tr.maxAccountDeltas {
tr.maxAccountDeltas = cfg.MaxAcctLookback + 1
tr.log.Infof("maxAccountDeltas was overridden to %d because of MaxAcctLookback=%d: this combination might use lots of RAM. To preserve some blocks in blockdb consider using MaxBlockHistoryLookback config option instead of MaxAcctLookback", tr.maxAccountDeltas, cfg.MaxAcctLookback)
}

tr.ctx, tr.ctxCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -333,24 +336,38 @@
}

func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error {
var dbRound basics.Round
err := tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) {
ar, err0 := tx.MakeAccountsReader()
if err0 != nil {
return err0

Check warning on line 343 in ledger/tracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/tracker.go#L343

Added line #L343 was not covered by tests
}

dbRound, err0 = ar.AccountsRound()
return err0
})
if err != nil {
return err

Check warning on line 350 in ledger/tracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/tracker.go#L350

Added line #L350 was not covered by tests
}

tr.mu.RLock()
dbRound := tr.dbRound
tr.dbRound = dbRound
tr.mu.RUnlock()

for _, lt := range tr.trackers {
err := lt.loadFromDisk(l, dbRound)
if err != nil {
err0 := lt.loadFromDisk(l, dbRound)
if err0 != nil {
// find the tracker name.
trackerName := reflect.TypeOf(lt).String()
return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err)
return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err0)

Check warning on line 362 in ledger/tracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/tracker.go#L362

Added line #L362 was not covered by tests
}
}

err := tr.replay(l)
if err != nil {
err = fmt.Errorf("initializeTrackerCaches failed : %w", err)
if err0 := tr.replay(l); err0 != nil {
return fmt.Errorf("trackers replay failed : %w", err0)

Check warning on line 367 in ledger/tracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/tracker.go#L367

Added line #L367 was not covered by tests
}
return err

return nil
}

func (tr *trackerRegistry) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
Expand Down Expand Up @@ -456,6 +473,20 @@
tr.accountsWriting.Wait()
}

func (tr *trackerRegistry) isBehindCommittingDeltas(latest basics.Round) bool {
tr.mu.RLock()
dbRound := tr.dbRound
tr.mu.RUnlock()

numDeltas := uint64(latest.SubSaturate(dbRound))
if numDeltas < tr.maxAccountDeltas {
return false
}

// there is a large number of deltas check if commitSyncer is not writing accounts
return tr.accountsCommitting.Load()
cce marked this conversation as resolved.
Show resolved Hide resolved
}

func (tr *trackerRegistry) close() {
if tr.ctxCancel != nil {
tr.ctxCancel()
Expand Down Expand Up @@ -562,6 +593,11 @@
start := time.Now()
ledgerCommitroundCount.Inc(nil)
err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
tr.accountsCommitting.Store(true)
defer func() {
tr.accountsCommitting.Store(false)
}()

aw, err := tx.MakeAccountsWriter()
if err != nil {
return err
Expand Down