diff --git a/ledger/state.go b/ledger/state.go index c75f8fb7..c4bc370a 100644 --- a/ledger/state.go +++ b/ledger/state.go @@ -41,14 +41,17 @@ import ( const ( cleanupConsumedUtxosInterval = 5 * time.Minute cleanupConsumedUtxosSlotWindow = 50000 // TODO: calculate this from params (#395) + + validateHistoricalThreshold = 14 * (24 * time.Hour) // 2 weeks ) type LedgerStateConfig struct { - Logger *slog.Logger - DataDir string - EventBus *event.EventBus - CardanoNodeConfig *cardano.CardanoNodeConfig - PromRegistry prometheus.Registerer + Logger *slog.Logger + DataDir string + EventBus *event.EventBus + CardanoNodeConfig *cardano.CardanoNodeConfig + PromRegistry prometheus.Registerer + ValidateHistorical bool // Callback(s) BlockfetchRequestRangeFunc BlockfetchRequestRangeFunc } @@ -357,6 +360,7 @@ func (ls *LedgerState) ledgerProcessBlocks() { return } shouldBlock := false + shouldValidate := ls.config.ValidateHistorical // We chose 500 as an arbitrary max batch size. A "chain extended" message will be logged after each batch nextBatch := make([]*chain.ChainIteratorResult, 0, 500) var next, nextRollback, cachedNext *chain.ChainIteratorResult @@ -440,6 +444,23 @@ func (ls *LedgerState) ledgerProcessBlocks() { } needsRollback = true } + // Enable validation if we're getting near current tip + if !shouldValidate && len(nextBatch) == 0 { + // Determine wall time for next block slot + slotTime, err := ls.SlotToTime(next.Point.Slot) + if err != nil { + ls.config.Logger.Error( + "failed to convert slot to time: " + err.Error(), + ) + return + } + // Check difference from current time + timeDiff := time.Since(slotTime) + if timeDiff < validateHistoricalThreshold { + shouldValidate = true + ls.config.Logger.Debug("enabling validation as we approach tip") + } + } // Add to batch nextBatch = append(nextBatch, next) // Don't exceed our pre-allocated capacity @@ -480,7 +501,7 @@ func (ls *LedgerState) ledgerProcessBlocks() { if err != nil { return fmt.Errorf("block decode failed: %w", err) } - if err = ls.ledgerProcessBlock(txn, next.Point, tmpBlock); err != nil { + if err = ls.ledgerProcessBlock(txn, next.Point, tmpBlock, shouldValidate); err != nil { return err } // Update tip @@ -527,6 +548,7 @@ func (ls *LedgerState) ledgerProcessBlock( txn *database.Txn, point ocommon.Point, block ledger.Block, + shouldValidate bool, ) error { // Check that we're processing things in order if len(ls.currentTip.Point.Hash) > 0 { @@ -546,34 +568,47 @@ func (ls *LedgerState) ledgerProcessBlock( // Process transactions var delta *LedgerDelta for _, tx := range block.Transactions() { - // Validate transaction - if ls.currentEra.ValidateTxFunc != nil { - lv := &LedgerView{ - txn: txn, - ls: ls, + if delta == nil { + delta = &LedgerDelta{ + Point: point, } - err := ls.currentEra.ValidateTxFunc( - tx, - point.Slot, - lv, - ls.currentPParams, - ) - if err != nil { - ls.config.Logger.Warn( - "TX " + tx.Hash(). - String() + - " failed validation: " + err.Error(), + } + // Validate transaction + if shouldValidate { + if ls.currentEra.ValidateTxFunc != nil { + lv := &LedgerView{ + txn: txn, + ls: ls, + } + err := ls.currentEra.ValidateTxFunc( + tx, + point.Slot, + lv, + ls.currentPParams, ) - // return fmt.Errorf("TX validation failure: %w", err) + if err != nil { + ls.config.Logger.Warn( + "TX " + tx.Hash(). + String() + + " failed validation: " + err.Error(), + ) + // return fmt.Errorf("TX validation failure: %w", err) + } } } - // Populate ledger delta from transaction and apply - delta = &LedgerDelta{ - Point: point, - } + // Populate ledger delta from transaction if err := delta.processTransaction(tx); err != nil { return fmt.Errorf("process transaction: %w", err) } + // Apply delta immediately if we may need the data to validate the next TX + if shouldValidate { + if err := delta.apply(ls, txn); err != nil { + return err + } + delta = nil + } + } + if delta != nil { if err := delta.apply(ls, txn); err != nil { return fmt.Errorf("apply ledger delta: %w", err) }