diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 878b437ffe..6da533fb19 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -126,6 +126,12 @@ var ( Required: false, Value: 0, } + SequencerPriorityFlag = cli.BoolFlag{ + Name: "sequencer.priority", + Usage: "Enable sequencer step takes precedence over other steps.", + EnvVar: prefixEnvVar("SEQUENCER_PRIORITY"), + Required: false, + } SequencerL1Confs = cli.Uint64Flag{ Name: "sequencer.l1-confs", Usage: "Number of L1 blocks to keep distance from the L1 head as a sequencer for picking an L1 origin.", @@ -230,6 +236,7 @@ var optionalFlags = []cli.Flag{ SequencerEnabledFlag, SequencerStoppedFlag, SequencerMaxSafeLagFlag, + SequencerPriorityFlag, SequencerL1Confs, L1EpochPollIntervalFlag, RPCEnableAdmin, diff --git a/op-node/rollup/derive/attributes.go b/op-node/rollup/derive/attributes.go index a02be5b28c..abda819074 100644 --- a/op-node/rollup/derive/attributes.go +++ b/op-node/rollup/derive/attributes.go @@ -24,6 +24,7 @@ type L1ReceiptsFetcher interface { type SystemConfigL2Fetcher interface { SystemConfigByL2Hash(ctx context.Context, hash common.Hash) (eth.SystemConfig, error) + CachePayloadByHash(payload *eth.ExecutionPayload) bool } // FetchingAttributesBuilder fetches inputs for the building of L2 payload attributes on the fly. @@ -130,3 +131,7 @@ func (ba *FetchingAttributesBuilder) PreparePayloadAttributes(ctx context.Contex GasLimit: (*eth.Uint64Quantity)(&sysConfig.GasLimit), }, nil } + +func (ba *FetchingAttributesBuilder) CachePayloadByHash(payload *eth.ExecutionPayload) bool { + return ba.l2.CachePayloadByHash(payload) +} diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index cf11b1a8d8..7b6ce7bae6 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -25,6 +25,7 @@ import ( type AttributesBuilder interface { PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) + CachePayloadByHash(payload *eth.ExecutionPayload) bool } type AttributesQueue struct { diff --git a/op-node/rollup/driver/config.go b/op-node/rollup/driver/config.go index f4013b95e1..fa0d6932ca 100644 --- a/op-node/rollup/driver/config.go +++ b/op-node/rollup/driver/config.go @@ -20,4 +20,7 @@ type Config struct { // SequencerMaxSafeLag is the maximum number of L2 blocks for restricting the distance between L2 safe and unsafe. // Disabled if 0. SequencerMaxSafeLag uint64 `json:"sequencer_max_safe_lag"` + + // SequencerPriority is true when sequencer step takes precedence over other steps. + SequencerPriority bool `json:"sequencer_priority"` } diff --git a/op-node/rollup/driver/sequencer.go b/op-node/rollup/driver/sequencer.go index af6d469cc1..3d5b30aa0b 100644 --- a/op-node/rollup/driver/sequencer.go +++ b/op-node/rollup/driver/sequencer.go @@ -226,6 +226,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP } return nil, nil } else { + d.attrBuilder.CachePayloadByHash(payload) d.log.Info("sequencer successfully built a new block", "block", payload.ID(), "time", uint64(payload.Timestamp), "txs", len(payload.Transactions)) return payload, nil } diff --git a/op-node/rollup/driver/sequencer_test.go b/op-node/rollup/driver/sequencer_test.go index 8e7db4269f..6bc0293e5e 100644 --- a/op-node/rollup/driver/sequencer_test.go +++ b/op-node/rollup/driver/sequencer_test.go @@ -137,6 +137,10 @@ func (fn testAttrBuilderFn) PreparePayloadAttributes(ctx context.Context, l2Pare return fn(ctx, l2Parent, epoch) } +func (fn testAttrBuilderFn) CachePayloadByHash(payload *eth.ExecutionPayload) bool { + return true +} + var _ derive.AttributesBuilder = (testAttrBuilderFn)(nil) type testOriginSelectorFn func(ctx context.Context, l2Head eth.L2BlockRef) (eth.L1BlockRef, error) diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 79fd55f2af..c4796fb3db 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -23,7 +23,7 @@ import ( type SyncStatus = eth.SyncStatus // sealingDuration defines the expected time it takes to seal the block -const sealingDuration = time.Millisecond * 50 +const sealingDuration = time.Millisecond * 1 type Driver struct { l1State L1StateIface @@ -199,6 +199,24 @@ func (s *Driver) eventLoop() { sequencerTimer.Reset(delay) } + sequencerStep := func() error { + payload, err := s.sequencer.RunNextSequencerAction(ctx) + if err != nil { + s.log.Error("Sequencer critical error", "err", err) + return err + } + if s.network != nil && payload != nil { + // Publishing of unsafe data via p2p is optional. + // Errors are not severe enough to change/halt sequencing but should be logged and metered. + if err := s.network.PublishL2Payload(ctx, payload); err != nil { + s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err) + s.metrics.RecordPublishingError() + } + } + planSequencerAction() // schedule the next sequencer action to keep the sequencing looping + return nil + } + // Create a ticker to check if there is a gap in the engine queue. Whenever // there is, we send requests to sync source to retrieve the missing payloads. syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2 @@ -228,7 +246,17 @@ func (s *Driver) eventLoop() { // This may adjust at any time based on fork-choice changes or previous errors. // // update sequencer time if the head changed - planSequencerAction() + delay := s.sequencer.PlanNextSequencerAction() + if delay == 0 { + // immediately do sequencerStep if time is ready + if err := sequencerStep(); err != nil { + return + } + // sequencerStep was already done, so we continue to next round + continue + } else { + planSequencerAction() + } } } else { sequencerCh = nil @@ -241,22 +269,27 @@ func (s *Driver) eventLoop() { altSyncTicker.Reset(syncCheckInterval) } + if s.driverConfig.SequencerPriority { + // help sequencerStep not interrupt by other steps + select { + case <-sequencerCh: + if err := sequencerStep(); err != nil { + return + } + continue + case newL1Head := <-s.l1HeadSig: // sequencerStep may depend on this when FindL1Origin + s.l1State.HandleNewL1HeadBlock(newL1Head) + reqStep() // a new L1 head may mean we have the data to not get an EOF again. + continue + default: + } + } + select { case <-sequencerCh: - payload, err := s.sequencer.RunNextSequencerAction(ctx) - if err != nil { - s.log.Error("Sequencer critical error", "err", err) + if err := sequencerStep(); err != nil { return } - if s.network != nil && payload != nil { - // Publishing of unsafe data via p2p is optional. - // Errors are not severe enough to change/halt sequencing but should be logged and metered. - if err := s.network.PublishL2Payload(ctx, payload); err != nil { - s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err) - s.metrics.RecordPublishingError() - } - } - planSequencerAction() // schedule the next sequencer action to keep the sequencing looping case <-altSyncTicker.C: // Check if there is a gap in the current unsafe payload queue. ctx, cancel := context.WithTimeout(ctx, time.Second*2) @@ -271,7 +304,6 @@ func (s *Driver) eventLoop() { s.derivation.AddUnsafePayload(payload) s.metrics.RecordReceivedUnsafePayload(payload) reqStep() - case newL1Head := <-s.l1HeadSig: s.l1State.HandleNewL1HeadBlock(newL1Head) reqStep() // a new L1 head may mean we have the data to not get an EOF again. diff --git a/op-node/service.go b/op-node/service.go index 33c3d5c7d0..6c2ef9b008 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -150,6 +150,7 @@ func NewDriverConfig(ctx *cli.Context) *driver.Config { SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name), SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name), SequencerMaxSafeLag: ctx.GlobalUint64(flags.SequencerMaxSafeLagFlag.Name), + SequencerPriority: ctx.GlobalBool(flags.SequencerPriorityFlag.Name), } } diff --git a/op-node/sources/eth_client.go b/op-node/sources/eth_client.go index b5e30a897b..e0638ec92e 100644 --- a/op-node/sources/eth_client.go +++ b/op-node/sources/eth_client.go @@ -341,6 +341,10 @@ func (s *EthClient) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.E return s.payloadCall(ctx, "eth_getBlockByHash", hashID(hash)) } +func (s *EthClient) CachePayloadByHash(payload *eth.ExecutionPayload) bool { + return s.payloadsCache.Add(payload.BlockHash, payload) +} + func (s *EthClient) PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) { return s.payloadCall(ctx, "eth_getBlockByNumber", numberID(number)) } diff --git a/op-node/testutils/mock_engine.go b/op-node/testutils/mock_engine.go index 2925ebf0fb..62e07e8b05 100644 --- a/op-node/testutils/mock_engine.go +++ b/op-node/testutils/mock_engine.go @@ -36,3 +36,7 @@ func (m *MockEngine) NewPayload(ctx context.Context, payload *eth.ExecutionPaylo func (m *MockEngine) ExpectNewPayload(payload *eth.ExecutionPayload, result *eth.PayloadStatusV1, err error) { m.Mock.On("NewPayload", payload).Once().Return(result, &err) } + +func (m *MockEngine) CachePayloadByHash(payload *eth.ExecutionPayload) bool { + return true +} diff --git a/op-node/testutils/mock_l2.go b/op-node/testutils/mock_l2.go index 41433dad74..18235698a9 100644 --- a/op-node/testutils/mock_l2.go +++ b/op-node/testutils/mock_l2.go @@ -43,3 +43,7 @@ func (m *MockL2Client) SystemConfigByL2Hash(ctx context.Context, hash common.Has func (m *MockL2Client) ExpectSystemConfigByL2Hash(hash common.Hash, cfg eth.SystemConfig, err error) { m.Mock.On("SystemConfigByL2Hash", hash).Once().Return(cfg, &err) } + +func (m *MockL2Client) CachePayloadByHash(payload *eth.ExecutionPayload) bool { + return true +} diff --git a/op-program/client/l2/engine.go b/op-program/client/l2/engine.go index b406f27e7f..ed3d97fdb1 100644 --- a/op-program/client/l2/engine.go +++ b/op-program/client/l2/engine.go @@ -112,3 +112,7 @@ func (o *OracleEngine) SystemConfigByL2Hash(ctx context.Context, hash common.Has } return derive.PayloadToSystemConfig(payload, o.rollupCfg) } + +func (o *OracleEngine) CachePayloadByHash(payload *eth.ExecutionPayload) bool { + return true +}