Skip to content

Commit

Permalink
perf: optimize sequencer performance (#25)
Browse files Browse the repository at this point in the history
* feat: cache payload after sequencer successfully built

* feat: adjust sealingDuration to start building immediately after sealing

* feat: sequencer step takes precedence over derivate step

* chore: add some comments

* refactor: make eventLoop more plain

* fix: rm l2geth

* fix(op-e2e): fallback to not use bsc specific method eth_getFinalizedBlock in e2e-test

* fix unit test

* fix op-e2e

* fix op-e2e

* fix: seqStep halt if disconnect from L1 for a while

* feat: add flag to enable/disable sequencer priority feature

---------

Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com>
  • Loading branch information
bnoieh and owen-reorg committed Oct 18, 2023
1 parent 4e8ed54 commit 457645f
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 15 deletions.
7 changes: 7 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ var (
Required: false,
Value: 0,
}
SequencerPriorityFlag = cli.BoolFlag{
Name: "sequencer.priority",
Usage: "Enable sequencer step takes precedence over other steps.",
EnvVar: prefixEnvVar("SEQUENCER_PRIORITY"),
Required: false,
}
SequencerL1Confs = cli.Uint64Flag{
Name: "sequencer.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head as a sequencer for picking an L1 origin.",
Expand Down Expand Up @@ -230,6 +236,7 @@ var optionalFlags = []cli.Flag{
SequencerEnabledFlag,
SequencerStoppedFlag,
SequencerMaxSafeLagFlag,
SequencerPriorityFlag,
SequencerL1Confs,
L1EpochPollIntervalFlag,
RPCEnableAdmin,
Expand Down
5 changes: 5 additions & 0 deletions op-node/rollup/derive/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type L1ReceiptsFetcher interface {

type SystemConfigL2Fetcher interface {
SystemConfigByL2Hash(ctx context.Context, hash common.Hash) (eth.SystemConfig, error)
CachePayloadByHash(payload *eth.ExecutionPayload) bool
}

// FetchingAttributesBuilder fetches inputs for the building of L2 payload attributes on the fly.
Expand Down Expand Up @@ -130,3 +131,7 @@ func (ba *FetchingAttributesBuilder) PreparePayloadAttributes(ctx context.Contex
GasLimit: (*eth.Uint64Quantity)(&sysConfig.GasLimit),
}, nil
}

func (ba *FetchingAttributesBuilder) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
return ba.l2.CachePayloadByHash(payload)
}
1 change: 1 addition & 0 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

type AttributesBuilder interface {
PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error)
CachePayloadByHash(payload *eth.ExecutionPayload) bool
}

type AttributesQueue struct {
Expand Down
3 changes: 3 additions & 0 deletions op-node/rollup/driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ type Config struct {
// SequencerMaxSafeLag is the maximum number of L2 blocks for restricting the distance between L2 safe and unsafe.
// Disabled if 0.
SequencerMaxSafeLag uint64 `json:"sequencer_max_safe_lag"`

// SequencerPriority is true when sequencer step takes precedence over other steps.
SequencerPriority bool `json:"sequencer_priority"`
}
1 change: 1 addition & 0 deletions op-node/rollup/driver/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP
}
return nil, nil
} else {
d.attrBuilder.CachePayloadByHash(payload)
d.log.Info("sequencer successfully built a new block", "block", payload.ID(), "time", uint64(payload.Timestamp), "txs", len(payload.Transactions))
return payload, nil
}
Expand Down
4 changes: 4 additions & 0 deletions op-node/rollup/driver/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ func (fn testAttrBuilderFn) PreparePayloadAttributes(ctx context.Context, l2Pare
return fn(ctx, l2Parent, epoch)
}

func (fn testAttrBuilderFn) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
return true
}

var _ derive.AttributesBuilder = (testAttrBuilderFn)(nil)

type testOriginSelectorFn func(ctx context.Context, l2Head eth.L2BlockRef) (eth.L1BlockRef, error)
Expand Down
62 changes: 47 additions & 15 deletions op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
type SyncStatus = eth.SyncStatus

// sealingDuration defines the expected time it takes to seal the block
const sealingDuration = time.Millisecond * 50
const sealingDuration = time.Millisecond * 1

type Driver struct {
l1State L1StateIface
Expand Down Expand Up @@ -199,6 +199,24 @@ func (s *Driver) eventLoop() {
sequencerTimer.Reset(delay)
}

sequencerStep := func() error {
payload, err := s.sequencer.RunNextSequencerAction(ctx)
if err != nil {
s.log.Error("Sequencer critical error", "err", err)
return err
}
if s.network != nil && payload != nil {
// Publishing of unsafe data via p2p is optional.
// Errors are not severe enough to change/halt sequencing but should be logged and metered.
if err := s.network.PublishL2Payload(ctx, payload); err != nil {
s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
s.metrics.RecordPublishingError()
}
}
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
return nil
}

// Create a ticker to check if there is a gap in the engine queue. Whenever
// there is, we send requests to sync source to retrieve the missing payloads.
syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2
Expand Down Expand Up @@ -228,7 +246,17 @@ func (s *Driver) eventLoop() {
// This may adjust at any time based on fork-choice changes or previous errors.
//
// update sequencer time if the head changed
planSequencerAction()
delay := s.sequencer.PlanNextSequencerAction()
if delay == 0 {
// immediately do sequencerStep if time is ready
if err := sequencerStep(); err != nil {
return
}
// sequencerStep was already done, so we continue to next round
continue
} else {
planSequencerAction()
}
}
} else {
sequencerCh = nil
Expand All @@ -241,22 +269,27 @@ func (s *Driver) eventLoop() {
altSyncTicker.Reset(syncCheckInterval)
}

if s.driverConfig.SequencerPriority {
// help sequencerStep not interrupt by other steps
select {
case <-sequencerCh:
if err := sequencerStep(); err != nil {
return
}
continue
case newL1Head := <-s.l1HeadSig: // sequencerStep may depend on this when FindL1Origin
s.l1State.HandleNewL1HeadBlock(newL1Head)
reqStep() // a new L1 head may mean we have the data to not get an EOF again.
continue
default:
}
}

select {
case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(ctx)
if err != nil {
s.log.Error("Sequencer critical error", "err", err)
if err := sequencerStep(); err != nil {
return
}
if s.network != nil && payload != nil {
// Publishing of unsafe data via p2p is optional.
// Errors are not severe enough to change/halt sequencing but should be logged and metered.
if err := s.network.PublishL2Payload(ctx, payload); err != nil {
s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
s.metrics.RecordPublishingError()
}
}
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue.
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
Expand All @@ -271,7 +304,6 @@ func (s *Driver) eventLoop() {
s.derivation.AddUnsafePayload(payload)
s.metrics.RecordReceivedUnsafePayload(payload)
reqStep()

case newL1Head := <-s.l1HeadSig:
s.l1State.HandleNewL1HeadBlock(newL1Head)
reqStep() // a new L1 head may mean we have the data to not get an EOF again.
Expand Down
1 change: 1 addition & 0 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func NewDriverConfig(ctx *cli.Context) *driver.Config {
SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name),
SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name),
SequencerMaxSafeLag: ctx.GlobalUint64(flags.SequencerMaxSafeLagFlag.Name),
SequencerPriority: ctx.GlobalBool(flags.SequencerPriorityFlag.Name),
}
}

Expand Down
4 changes: 4 additions & 0 deletions op-node/sources/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ func (s *EthClient) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.E
return s.payloadCall(ctx, "eth_getBlockByHash", hashID(hash))
}

func (s *EthClient) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
return s.payloadsCache.Add(payload.BlockHash, payload)
}

func (s *EthClient) PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) {
return s.payloadCall(ctx, "eth_getBlockByNumber", numberID(number))
}
Expand Down
4 changes: 4 additions & 0 deletions op-node/testutils/mock_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ func (m *MockEngine) NewPayload(ctx context.Context, payload *eth.ExecutionPaylo
func (m *MockEngine) ExpectNewPayload(payload *eth.ExecutionPayload, result *eth.PayloadStatusV1, err error) {
m.Mock.On("NewPayload", payload).Once().Return(result, &err)
}

func (m *MockEngine) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
return true
}
4 changes: 4 additions & 0 deletions op-node/testutils/mock_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ func (m *MockL2Client) SystemConfigByL2Hash(ctx context.Context, hash common.Has
func (m *MockL2Client) ExpectSystemConfigByL2Hash(hash common.Hash, cfg eth.SystemConfig, err error) {
m.Mock.On("SystemConfigByL2Hash", hash).Once().Return(cfg, &err)
}

func (m *MockL2Client) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
return true
}
4 changes: 4 additions & 0 deletions op-program/client/l2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ func (o *OracleEngine) SystemConfigByL2Hash(ctx context.Context, hash common.Has
}
return derive.PayloadToSystemConfig(payload, o.rollupCfg)
}

func (o *OracleEngine) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
return true
}

0 comments on commit 457645f

Please sign in to comment.