Skip to content

Commit

Permalink
op-node: encapsulate finality and simplify EngineQueue (#10580)
Browse files Browse the repository at this point in the history
* op-node: refactor finality to encapsulate and simplify EngineQueue

* op-node: add lock to make concurrent use of Finalizer by plasma backend safe, and rename receiver-method var names

* op-node: reintroduce instant L2 finality check upon L1 signal, reintroduce extra check to handle contrived test

* op-node: fix plasma finalization test setup

* semgrep fix

* op-node: link TODO issue
  • Loading branch information
protolambda committed Jun 3, 2024
1 parent 5e23d3a commit 58f82ec
Show file tree
Hide file tree
Showing 14 changed files with 948 additions and 539 deletions.
4 changes: 3 additions & 1 deletion op-e2e/actions/l2_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func L2Finalization(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
sequencer.ActL2PipelineFull(t)
sequencer.ActL1FinalizedSignal(t)
sequencer.ActL1SafeSignal(t)
sequencer.ActL2PipelineFull(t) // ensure that the forkchoice changes have been applied to the engine
require.Equal(t, uint64(2), sequencer.SyncStatus().SafeL1.Number)
require.Equal(t, uint64(1), sequencer.SyncStatus().FinalizedL1.Number)
require.Equal(t, uint64(0), sequencer.SyncStatus().FinalizedL2.Number, "L2 block has to be included on L1 before it can be finalized")
Expand Down Expand Up @@ -227,6 +228,7 @@ func L2Finalization(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
sequencer.ActL1FinalizedSignal(t)
sequencer.ActL1SafeSignal(t)
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t) // ensure that the forkchoice changes have been applied to the engine
require.Equal(t, uint64(6), sequencer.SyncStatus().HeadL1.Number)
require.Equal(t, uint64(4), sequencer.SyncStatus().SafeL1.Number)
require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number)
Expand All @@ -244,7 +246,7 @@ func L2Finalization(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
// If we get this false signal, we shouldn't finalize the L2 chain.
altBlock4 := sequencer.SyncStatus().SafeL1
altBlock4.Hash = common.HexToHash("0xdead")
sequencer.derivation.Finalize(altBlock4)
sequencer.finalizer.Finalize(t.Ctx(), altBlock4)
sequencer.ActL2PipelineFull(t)
require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number)
require.Equal(t, heightToSubmit, sequencer.SyncStatus().FinalizedL2.Number, "unknown/bad finalized L1 blocks are ignored")
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/actions/l2_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type L2Sequencer struct {
}

func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc derive.L1BlobsFetcher,
plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, blobSrc, plasmaSrc, eng, cfg, &sync.Config{}, safedb.Disabled)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1)
Expand Down
20 changes: 16 additions & 4 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
Expand All @@ -36,6 +37,8 @@ type L2Verifier struct {
engine *derive.EngineController
derivation *derive.DerivationPipeline

finalizer driver.Finalizer

l1 derive.L1Fetcher
l1State *driver.L1State

Expand Down Expand Up @@ -63,17 +66,26 @@ type safeDB interface {
node.SafeDBReader
}

func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{}
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, syncCfg, safeHeadListener)

var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, engine, plasmaSrc)
} else {
finalizer = finality.NewFinalizer(log, cfg, l1, engine)
}

pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, syncCfg, safeHeadListener, finalizer)
pipeline.Reset()

rollupNode := &L2Verifier{
log: log,
eng: eng,
engine: engine,
derivation: pipeline,
finalizer: finalizer,
l1: l1,
l1State: driver.NewL1State(log, metrics),
l2PipelineIdle: true,
Expand Down Expand Up @@ -162,7 +174,7 @@ func (s *L2Verifier) L2BackupUnsafe() eth.L2BlockRef {
func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(),
CurrentL1Finalized: s.derivation.FinalizedL1(),
CurrentL1Finalized: s.finalizer.FinalizedL1(),
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
Expand Down Expand Up @@ -214,7 +226,7 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
s.l1State.HandleNewL1FinalizedBlock(finalized)
s.derivation.Finalize(finalized)
s.finalizer.Finalize(t.Ctx(), finalized)
}

// ActL2PipelineStep runs one iteration of the L2 derivation pipeline
Expand Down
4 changes: 0 additions & 4 deletions op-node/rollup/derive/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ type PlasmaInputFetcher interface {
AdvanceL1Origin(ctx context.Context, l1 plasma.L1Fetcher, blockId eth.BlockID) error
// Reset the challenge origin in case of L1 reorg
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
// Notify L1 finalized head so plasma finality is always behind L1
Finalize(ref eth.L1BlockRef)
// Set the engine finalization signal callback
OnFinalizedHeadSignal(f plasma.HeadSignalFn)
}

// DataSourceFactory reads raw transactions from a given block & then filters for
Expand Down
180 changes: 20 additions & 160 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,50 +114,18 @@ type SafeHeadListener interface {
SafeHeadReset(resetSafeHead eth.L2BlockRef) error
}

// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024

// finalityLookback defines the amount of L1<>L2 relations to track for finalization purposes, one per L1 block.
//
// When L1 finalizes blocks, it finalizes finalityLookback blocks behind the L1 head.
// Non-finality may take longer, but when it does finalize again, it is within this range of the L1 head.
// Thus we only need to retain the L1<>L2 derivation relation data of this many L1 blocks.
//
// In the event of older finalization signals, misconfiguration, or insufficient L1<>L2 derivation relation data,
// then we may miss the opportunity to finalize more L2 blocks.
// This does not cause any divergence, it just causes lagging finalization status.
//
// The beacon chain on mainnet has 32 slots per epoch,
// and new finalization events happen at most 4 epochs behind the head.
// And then we add 1 to make pruning easier by leaving room for a new item without pruning the 32*4.
const finalityLookback = 4*32 + 1

// finalityDelay is the number of L1 blocks to traverse before trying to finalize L2 blocks again.
// We do not want to do this too often, since it requires fetching a L1 block by number, so no cache data.
const finalityDelay = 64

// calcFinalityLookback calculates the default finality lookback based on DA challenge window if plasma
// mode is activated or L1 finality lookback.
func calcFinalityLookback(cfg *rollup.Config) uint64 {
// in plasma mode the longest finality lookback is a commitment is challenged on the last block of
// the challenge window in which case it will be both challenge + resolve window.
if cfg.PlasmaEnabled() {
lkb := cfg.PlasmaConfig.DAChallengeWindow + cfg.PlasmaConfig.DAResolveWindow + 1
// in the case only if the plasma windows are longer than the default finality lookback
if lkb > finalityLookback {
return lkb
}
}
return finalityLookback
type FinalizerHooks interface {
// OnDerivationL1End remembers the given L1 block,
// and finalizes any prior data with the latest finality signal based on block height.
OnDerivationL1End(ctx context.Context, derivedFrom eth.L1BlockRef) error
// PostProcessSafeL2 remembers the L2 block is derived from the given L1 block, for later finalization.
PostProcessSafeL2(l2Safe eth.L2BlockRef, derivedFrom eth.L1BlockRef)
// Reset clear recent state, to adapt to reorgs.
Reset()
}

type FinalityData struct {
// The last L2 block that was fully derived and inserted into the L2 engine while processing this L1 block.
L2Block eth.L2BlockRef
// The L1 block this stage was at when inserting the L2 block.
// When this L1 block is finalized, the L2 chain up to this block can be fully reproduced from finalized L1 data.
L1Block eth.BlockID
}
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024

// EngineQueue queues up payload attributes to consolidate or process with the provided Engine
type EngineQueue struct {
Expand All @@ -166,20 +134,10 @@ type EngineQueue struct {

ec LocalEngineControl

// finalizedL1 is the currently perceived finalized L1 block.
// This may be ahead of the current traversed origin when syncing.
finalizedL1 eth.L1BlockRef

// triedFinalizeAt tracks at which origin we last tried to finalize during sync.
triedFinalizeAt eth.L1BlockRef

// The queued-up attributes
safeAttributes *AttributesWithParent
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates

// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
finalityData []FinalityData

engine L2Source
prev NextAttributesProvider

Expand All @@ -193,22 +151,26 @@ type EngineQueue struct {

safeHeadNotifs SafeHeadListener // notified when safe head is updated
lastNotifiedSafeHead eth.L2BlockRef

finalizer FinalizerHooks
}

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics,
prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener,
finalizer FinalizerHooks) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
ec: engine,
engine: l2Source,
metrics: metrics,
finalityData: make([]FinalityData, 0, calcFinalityLookback(cfg)),
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
prev: prev,
l1Fetcher: l1Fetcher,
syncCfg: syncCfg,
safeHeadNotifs: safeHeadNotifs,
finalizer: finalizer,
}
}

Expand Down Expand Up @@ -236,37 +198,6 @@ func (eq *EngineQueue) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope)
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
}

func (eq *EngineQueue) Finalize(l1Origin eth.L1BlockRef) {
prevFinalizedL1 := eq.finalizedL1
if l1Origin.Number < eq.finalizedL1.Number {
eq.log.Error("ignoring old L1 finalized block signal! Is the L1 provider corrupted?", "prev_finalized_l1", prevFinalizedL1, "signaled_finalized_l1", l1Origin)
return
}

// remember the L1 finalization signal
eq.finalizedL1 = l1Origin

// Sanity check: we only try to finalize L2 immediately, without fetching additional data,
// if we are on the same chain as the signal.
// If we are on a different chain, the signal will be ignored,
// and tryFinalizeL1Origin() will eventually detect that we are on the wrong chain,
// if not resetting due to reorg elsewhere already.
for _, fd := range eq.finalityData {
if fd.L1Block == l1Origin.ID() {
eq.tryFinalizeL2()
return
}
}

eq.log.Info("received L1 finality signal, but missing data for immediate L2 finalization", "prev_finalized_l1", prevFinalizedL1, "signaled_finalized_l1", l1Origin)
}

// FinalizedL1 identifies the L1 chain (incl.) that included and/or produced all the finalized L2 blocks.
// This may return a zeroed ID if no finalization signals have been seen yet.
func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef {
return eq.finalizedL1
}

// LowestQueuedUnsafeBlock returns the block
func (eq *EngineQueue) LowestQueuedUnsafeBlock() eth.L2BlockRef {
payload := eq.unsafePayloads.Peek()
Expand Down Expand Up @@ -328,8 +259,8 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
return err
}
// try to finalize the L2 blocks we have synced so far (no-op if L1 finality is behind)
if err := eq.tryFinalizePastL2Blocks(ctx); err != nil {
return err
if err := eq.finalizer.OnDerivationL1End(ctx, eq.origin); err != nil {
return fmt.Errorf("finalizer OnDerivationL1End error: %w", err)
}
if next, err := eq.prev.NextAttributes(ctx, eq.ec.PendingSafeL2Head()); err == io.EOF {
return io.EOF
Expand Down Expand Up @@ -375,84 +306,13 @@ func (eq *EngineQueue) verifyNewL1Origin(ctx context.Context, newOrigin eth.L1Bl
return nil
}

func (eq *EngineQueue) tryFinalizePastL2Blocks(ctx context.Context) error {
if eq.finalizedL1 == (eth.L1BlockRef{}) {
return nil
}

// If the L1 is finalized beyond the point we are traversing (e.g. during sync),
// then we should check if we can finalize this L1 block we are traversing.
// Otherwise, nothing to act on here, we will finalize later on a new finality signal matching the recent history.
if eq.finalizedL1.Number < eq.origin.Number {
return nil
}

// If we recently tried finalizing, then don't try again just yet, but traverse more of L1 first.
if eq.triedFinalizeAt != (eth.L1BlockRef{}) && eq.origin.Number <= eq.triedFinalizeAt.Number+finalityDelay {
return nil
}

eq.log.Info("processing L1 finality information", "l1_finalized", eq.finalizedL1, "l1_origin", eq.origin, "previous", eq.triedFinalizeAt)

// Sanity check we are indeed on the finalizing chain, and not stuck on something else.
// We assume that the block-by-number query is consistent with the previously received finalized chain signal
ref, err := eq.l1Fetcher.L1BlockRefByNumber(ctx, eq.origin.Number)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to check if on finalizing L1 chain: %w", err))
}
if ref.Hash != eq.origin.Hash {
return NewResetError(fmt.Errorf("need to reset, we are on %s, not on the finalizing L1 chain %s (towards %s)", eq.origin, ref, eq.finalizedL1))
}
eq.tryFinalizeL2()
return nil
}

// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
// and then marks the latest fully derived L2 block from this as finalized,
// or defaults to the current finalized L2 block.
func (eq *EngineQueue) tryFinalizeL2() {
if eq.finalizedL1 == (eth.L1BlockRef{}) {
return // if no L1 information is finalized yet, then skip this
}
eq.triedFinalizeAt = eq.origin
// default to keep the same finalized block
finalizedL2 := eq.ec.Finalized()
// go through the latest inclusion data, and find the last L2 block that was derived from a finalized L1 block
for _, fd := range eq.finalityData {
if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number {
finalizedL2 = fd.L2Block
}
}
eq.ec.SetFinalizedHead(finalizedL2)
}

// postProcessSafeL2 buffers the L1 block the safe head was fully derived from,
// to finalize it once the L1 block, or later, finalizes.
func (eq *EngineQueue) postProcessSafeL2() error {
if err := eq.notifyNewSafeHead(eq.ec.SafeL2Head()); err != nil {
return err
}
// prune finality data if necessary
if uint64(len(eq.finalityData)) >= calcFinalityLookback(eq.cfg) {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:calcFinalityLookback(eq.cfg)]...)
}
// remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.ec.SafeL2Head(),
L1Block: eq.origin.ID(),
})
last := &eq.finalityData[len(eq.finalityData)-1]
eq.log.Debug("extended finality-data", "last_l1", last.L1Block, "last_l2", last.L2Block)
} else {
// if it's a new L2 block that was derived from the same latest L1 block, then just update the entry
last := &eq.finalityData[len(eq.finalityData)-1]
if last.L2Block != eq.ec.SafeL2Head() { // avoid logging if there are no changes
last.L2Block = eq.ec.SafeL2Head()
eq.log.Debug("updated finality-data", "last_l1", last.L1Block, "last_l2", last.L2Block)
}
}
eq.finalizer.PostProcessSafeL2(eq.ec.SafeL2Head(), eq.origin)
return nil
}

Expand Down Expand Up @@ -729,7 +589,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
eq.safeAttributes = nil
eq.ec.ResetBuildingState()
eq.finalityData = eq.finalityData[:0]
eq.finalizer.Reset()
// note: finalizedL1 and triedFinalizeAt do not reset, since these do not change between reorgs.
// note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.origin = pipelineOrigin
Expand Down
Loading

0 comments on commit 58f82ec

Please sign in to comment.