diff --git a/ledger/chainsync.go b/ledger/chainsync.go index 770efdea..96e030c5 100644 --- a/ledger/chainsync.go +++ b/ledger/chainsync.go @@ -397,6 +397,8 @@ func (ls *LedgerState) processEpochRollover( "epoch", fmt.Sprintf("%+v", ls.currentEpoch), "component", "ledger", ) + // Start background cleanup of consumed UTxOs + go ls.cleanupConsumedUtxos() return nil } diff --git a/ledger/state.go b/ledger/state.go index e369850c..ae4646aa 100644 --- a/ledger/state.go +++ b/ledger/state.go @@ -205,35 +205,38 @@ func (ls *LedgerState) scheduleCleanupConsumedUtxos() { ls.timerCleanupConsumedUtxos = time.AfterFunc( cleanupConsumedUtxosInterval, func() { - defer func() { - // Schedule the next run - ls.scheduleCleanupConsumedUtxos() - }() - // Get the current tip, since we're querying by slot - tip := ls.Tip() - // Delete UTxOs that are marked as deleted and older than our slot window - ls.config.Logger.Debug( - "cleaning up consumed UTxOs", - "component", "ledger", - ) - ls.Lock() - err := ls.db.UtxosDeleteConsumed( - tip.Point.Slot-cleanupConsumedUtxosSlotWindow, - nil, - ) - ls.Unlock() - if err != nil { - ls.config.Logger.Error( - "failed to cleanup consumed UTxOs", - "component", "ledger", - "error", err, - ) - return - } + ls.cleanupConsumedUtxos() + // Schedule the next run + ls.scheduleCleanupConsumedUtxos() }, ) } +func (ls *LedgerState) cleanupConsumedUtxos() { + // Get the current tip, since we're querying by slot + tip := ls.Tip() + // Delete UTxOs that are marked as deleted and older than our slot window + ls.config.Logger.Debug( + "cleaning up consumed UTxOs", + "component", "ledger", + ) + if tip.Point.Slot > cleanupConsumedUtxosSlotWindow { + ls.Lock() + err := ls.db.UtxosDeleteConsumed( + tip.Point.Slot-cleanupConsumedUtxosSlotWindow, + nil, + ) + ls.Unlock() + if err != nil { + ls.config.Logger.Error( + "failed to cleanup consumed UTxOs", + "component", "ledger", + "error", err, + ) + } + } +} + func (ls *LedgerState) rollback(point ocommon.Point) error { // Start a transaction txn := ls.db.Transaction(true) @@ -466,6 +469,7 @@ func (ls *LedgerState) ledgerProcessBlocks() { shouldValidate := ls.config.ValidateHistorical for { if needsEpochRollover { + ls.Lock() needsEpochRollover = false txn := ls.db.Transaction(true) err := txn.Do(func(txn *database.Txn) error { @@ -484,6 +488,7 @@ func (ls *LedgerState) ledgerProcessBlocks() { } return nil }) + ls.Unlock() if err != nil { ls.config.Logger.Error( "failed to process epoch rollover: " + err.Error(),