Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ledger/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ func (ls *LedgerState) processEpochRollover(
"epoch", fmt.Sprintf("%+v", ls.currentEpoch),
"component", "ledger",
)
// Start background cleanup of consumed UTxOs
go ls.cleanupConsumedUtxos()
return nil
}

Expand Down
55 changes: 30 additions & 25 deletions ledger/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,35 +205,38 @@ func (ls *LedgerState) scheduleCleanupConsumedUtxos() {
ls.timerCleanupConsumedUtxos = time.AfterFunc(
cleanupConsumedUtxosInterval,
func() {
defer func() {
// Schedule the next run
ls.scheduleCleanupConsumedUtxos()
}()
// Get the current tip, since we're querying by slot
tip := ls.Tip()
// Delete UTxOs that are marked as deleted and older than our slot window
ls.config.Logger.Debug(
"cleaning up consumed UTxOs",
"component", "ledger",
)
ls.Lock()
err := ls.db.UtxosDeleteConsumed(
tip.Point.Slot-cleanupConsumedUtxosSlotWindow,
nil,
)
ls.Unlock()
if err != nil {
ls.config.Logger.Error(
"failed to cleanup consumed UTxOs",
"component", "ledger",
"error", err,
)
return
}
ls.cleanupConsumedUtxos()
// Schedule the next run
ls.scheduleCleanupConsumedUtxos()
},
)
}

func (ls *LedgerState) cleanupConsumedUtxos() {
// Get the current tip, since we're querying by slot
tip := ls.Tip()
// Delete UTxOs that are marked as deleted and older than our slot window
ls.config.Logger.Debug(
"cleaning up consumed UTxOs",
"component", "ledger",
)
if tip.Point.Slot > cleanupConsumedUtxosSlotWindow {
ls.Lock()
err := ls.db.UtxosDeleteConsumed(
tip.Point.Slot-cleanupConsumedUtxosSlotWindow,
nil,
)
ls.Unlock()
if err != nil {
ls.config.Logger.Error(
"failed to cleanup consumed UTxOs",
"component", "ledger",
"error", err,
)
}
}
}

func (ls *LedgerState) rollback(point ocommon.Point) error {
// Start a transaction
txn := ls.db.Transaction(true)
Expand Down Expand Up @@ -466,6 +469,7 @@ func (ls *LedgerState) ledgerProcessBlocks() {
shouldValidate := ls.config.ValidateHistorical
for {
if needsEpochRollover {
ls.Lock()
needsEpochRollover = false
txn := ls.db.Transaction(true)
err := txn.Do(func(txn *database.Txn) error {
Expand All @@ -484,6 +488,7 @@ func (ls *LedgerState) ledgerProcessBlocks() {
}
return nil
})
ls.Unlock()
if err != nil {
ls.config.Logger.Error(
"failed to process epoch rollover: " + err.Error(),
Expand Down
Loading