diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index e95fdc0c793c..e364570d48b0 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -78,18 +78,20 @@ type ChannelBuilder struct { // newChannelBuilder creates a new channel builder or returns an error if the // channel out could not be created. +// it acts as a factory for either a span or singular channel out func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error) { c, err := cfg.CompressorConfig.NewCompressor() if err != nil { return nil, err } - var spanBatch *derive.SpanBatch + var co derive.ChannelOut if cfg.BatchType == derive.SpanBatchType { - spanBatch = derive.NewSpanBatch(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID) + co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize) + } else { + co, err = derive.NewSingularChannelOut(c) } - co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatch) if err != nil { - return nil, err + return nil, fmt.Errorf("creating channel out: %w", err) } cb := &ChannelBuilder{ @@ -154,7 +156,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro return l1info, fmt.Errorf("converting block to batch: %w", err) } - if _, err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) { + if err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) { c.setFullErr(err) return l1info, c.FullErr() } else if err != nil { diff --git a/op-batcher/batcher/channel_builder_test.go b/op-batcher/batcher/channel_builder_test.go index a402c203fc85..1d220fe239e5 100644 --- a/op-batcher/batcher/channel_builder_test.go +++ b/op-batcher/batcher/channel_builder_test.go @@ -297,6 +297,7 @@ func TestChannelBuilderBatchType(t *testing.T) { {"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames}, {"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes}, {"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes}, + {"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic}, } for _, test := range tests { test := test @@ -354,8 +355,9 @@ func TestChannelBuilder_NextFrame(t *testing.T) { } // TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id -func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { +func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) { channelConfig := defaultTestChannelConfig() + channelConfig.BatchType = batchType // Construct a channel builder cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin) @@ -363,9 +365,10 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { // Mock the internals of `ChannelBuilder.outputFrame` // to construct a single frame + // the type of batch does not matter here because we are using it to construct a broken frame c, err := channelConfig.CompressorConfig.NewCompressor() require.NoError(t, err) - co, err := derive.NewChannelOut(derive.SingularBatchType, c, nil) + co, err := derive.NewSingularChannelOut(c) require.NoError(t, err) var buf bytes.Buffer fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize) diff --git a/op-batcher/compressor/blind_compressor.go b/op-batcher/compressor/blind_compressor.go deleted file mode 100644 index b7005f0bf5ad..000000000000 --- a/op-batcher/compressor/blind_compressor.go +++ /dev/null @@ -1,73 +0,0 @@ -package compressor - -import ( - "bytes" - "compress/zlib" - - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" -) - -// BlindCompressor is a simple compressor that blindly compresses data -// the only way to know if the target size has been reached is to first flush the buffer -// and then check the length of the compressed data -type BlindCompressor struct { - config Config - - inputBytes int - buf bytes.Buffer - compress *zlib.Writer -} - -// NewBlindCompressor creates a new derive.Compressor implementation that compresses -func NewBlindCompressor(config Config) (derive.Compressor, error) { - c := &BlindCompressor{ - config: config, - } - - compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression) - if err != nil { - return nil, err - } - c.compress = compress - - return c, nil -} - -func (t *BlindCompressor) Write(p []byte) (int, error) { - if err := t.FullErr(); err != nil { - return 0, err - } - t.inputBytes += len(p) - return t.compress.Write(p) -} - -func (t *BlindCompressor) Close() error { - return t.compress.Close() -} - -func (t *BlindCompressor) Read(p []byte) (int, error) { - return t.buf.Read(p) -} - -func (t *BlindCompressor) Reset() { - t.buf.Reset() - t.compress.Reset(&t.buf) - t.inputBytes = 0 -} - -func (t *BlindCompressor) Len() int { - return t.buf.Len() -} - -func (t *BlindCompressor) Flush() error { - return t.compress.Flush() -} - -// FullErr returns an error if the target output size has been reached. -// Flush *must* be called before this method to ensure the buffer is up to date -func (t *BlindCompressor) FullErr() error { - if uint64(t.Len()) >= t.config.TargetOutputSize { - return derive.ErrCompressorFull - } - return nil -} diff --git a/op-batcher/compressor/blind_compressor_test.go b/op-batcher/compressor/blind_compressor_test.go deleted file mode 100644 index 49c1c0a5bbf1..000000000000 --- a/op-batcher/compressor/blind_compressor_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package compressor_test - -import ( - "testing" - - "github.com/ethereum-optimism/optimism/op-batcher/compressor" - "github.com/stretchr/testify/require" -) - -func TestBlindCompressorLimit(t *testing.T) { - bc, err := compressor.NewBlindCompressor(compressor.Config{ - TargetOutputSize: 10, - }) - require.NoError(t, err) - - // write far too much data to the compressor, but never flush - for i := 0; i < 100; i++ { - _, err := bc.Write([]byte("hello")) - require.NoError(t, err) - require.NoError(t, bc.FullErr()) - } - - // finally flush the compressor and see that it is full - bc.Flush() - require.Error(t, bc.FullErr()) - - // write a little more data to the compressor and see that it is still full - _, err = bc.Write([]byte("hello")) - require.Error(t, err) -} diff --git a/op-batcher/compressor/compressors.go b/op-batcher/compressor/compressors.go index d6d24ea804e3..1a6d64da13e6 100644 --- a/op-batcher/compressor/compressors.go +++ b/op-batcher/compressor/compressors.go @@ -10,7 +10,6 @@ const ( RatioKind = "ratio" ShadowKind = "shadow" NoneKind = "none" - BlindKind = "blind" // CloseOverheadZlib is the number of final bytes a [zlib.Writer] call writes // to the output buffer. @@ -21,7 +20,6 @@ var Kinds = map[string]FactoryFunc{ RatioKind: NewRatioCompressor, ShadowKind: NewShadowCompressor, NoneKind: NewNonCompressor, - BlindKind: NewBlindCompressor, } var KindKeys []string diff --git a/op-e2e/actions/garbage_channel_out.go b/op-e2e/actions/garbage_channel_out.go index 41a290570ea9..50413e6f9a82 100644 --- a/op-e2e/actions/garbage_channel_out.go +++ b/op-e2e/actions/garbage_channel_out.go @@ -54,7 +54,7 @@ type Writer interface { type ChannelOutIface interface { ID() derive.ChannelID Reset() error - AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) + AddBlock(rollupCfg *rollup.Config, block *types.Block) error ReadyBytes() int Flush() error Close() error @@ -138,19 +138,19 @@ func (co *GarbageChannelOut) Reset() error { // error that it returns is ErrTooManyRLPBytes. If this error // is returned, the channel should be closed and a new one // should be made. -func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) { +func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error { if co.closed { - return 0, errors.New("already closed") + return errors.New("already closed") } batch, err := blockToBatch(rollupCfg, block) if err != nil { - return 0, err + return err } // We encode to a temporary buffer to determine the encoded length to // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL var buf bytes.Buffer if err := rlp.Encode(&buf, batch); err != nil { - return 0, err + return err } if co.cfg.malformRLP { // Malform the RLP by incrementing the length prefix by 1. @@ -160,13 +160,13 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo buf.Write(bufBytes) } if co.rlpLength+buf.Len() > derive.MaxRLPBytesPerChannel { - return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", + return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", buf.Len(), co.rlpLength, derive.MaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes) } co.rlpLength += buf.Len() - written, err := io.Copy(co.compress, &buf) - return uint64(written), err + _, err = io.Copy(co.compress, &buf) + return err } // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. diff --git a/op-e2e/actions/l2_batcher.go b/op-e2e/actions/l2_batcher.go index 949aa176b77e..46216d56c6af 100644 --- a/op-e2e/actions/l2_batcher.go +++ b/op-e2e/actions/l2_batcher.go @@ -189,29 +189,28 @@ func (s *L2Batcher) Buffer(t Testing) error { if s.l2BatcherCfg.GarbageCfg != nil { ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg) } else { - c, e := compressor.NewBlindCompressor(compressor.Config{ - TargetOutputSize: batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize), + target := batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize) + c, e := compressor.NewShadowCompressor(compressor.Config{ + TargetOutputSize: target, }) require.NoError(t, e, "failed to create compressor") - var batchType uint = derive.SingularBatchType - var spanBatch *derive.SpanBatch - if s.l2BatcherCfg.ForceSubmitSingularBatch && s.l2BatcherCfg.ForceSubmitSpanBatch { t.Fatalf("ForceSubmitSingularBatch and ForceSubmitSpanBatch cannot be set to true at the same time") - } else if s.l2BatcherCfg.ForceSubmitSingularBatch { - // use SingularBatchType - } else if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) { - // If both ForceSubmitSingularBatch and ForceSubmitSpanbatch are false, use SpanBatch automatically if Delta HF is activated. - batchType = derive.SpanBatchType - spanBatch = derive.NewSpanBatch(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID) + } else { + // use span batch if we're forcing it or if we're at/beyond delta + if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) { + ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target) + // use singular batches in all other cases + } else { + ch, err = derive.NewSingularChannelOut(c) + } } - ch, err = derive.NewChannelOut(batchType, c, spanBatch) } require.NoError(t, err, "failed to create channel") s.l2ChannelOut = ch } - if _, err := s.l2ChannelOut.AddBlock(s.rollupCfg, block); err != nil { // should always succeed + if err := s.l2ChannelOut.AddBlock(s.rollupCfg, block); err != nil { return err } ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash()) diff --git a/op-e2e/actions/sync_test.go b/op-e2e/actions/sync_test.go index fb20e6565b23..759f51613e0e 100644 --- a/op-e2e/actions/sync_test.go +++ b/op-e2e/actions/sync_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" @@ -27,12 +26,7 @@ import ( ) func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut { - c, err := compressor.NewBlindCompressor(compressor.Config{ - TargetOutputSize: 128_000, - }) - require.NoError(t, err) - spanBatch := derive.NewSpanBatch(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID) - channelOut, err := derive.NewChannelOut(derive.SpanBatchType, c, spanBatch) + channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000) require.NoError(t, err) return channelOut } @@ -249,7 +243,7 @@ func TestBackupUnsafe(gt *testing.T) { block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) } // Add A1, B2, B3, B4, B5 into the channel - _, err = channelOut.AddBlock(sd.RollupCfg, block) + err = channelOut.AddBlock(sd.RollupCfg, block) require.NoError(t, err) } @@ -412,7 +406,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) } // Add A1, B2, B3, B4, B5 into the channel - _, err = channelOut.AddBlock(sd.RollupCfg, block) + err = channelOut.AddBlock(sd.RollupCfg, block) require.NoError(t, err) } @@ -551,7 +545,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) } // Add A1, B2, B3, B4, B5 into the channel - _, err = channelOut.AddBlock(sd.RollupCfg, block) + err = channelOut.AddBlock(sd.RollupCfg, block) require.NoError(t, err) } @@ -870,7 +864,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) } // Add A1 ~ A12 into the channel - _, err = channelOut.AddBlock(sd.RollupCfg, block) + err = channelOut.AddBlock(sd.RollupCfg, block) require.NoError(t, err) } @@ -919,7 +913,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { block = block.WithBody([]*types.Transaction{block.Transactions()[0], tx}, []*types.Header{}) } // Add B1, A2 ~ A12 into the channel - _, err = channelOut.AddBlock(sd.RollupCfg, block) + err = channelOut.AddBlock(sd.RollupCfg, block) require.NoError(t, err) } // Submit span batch(B1, A2, ... A12) diff --git a/op-node/benchmarks/batchbuilding_test.go b/op-node/benchmarks/batchbuilding_test.go index e89748439ef4..52d6d5721fd8 100644 --- a/op-node/benchmarks/batchbuilding_test.go +++ b/op-node/benchmarks/batchbuilding_test.go @@ -12,30 +12,37 @@ import ( "github.com/stretchr/testify/require" ) -var ( +const ( + // a really large target output size to ensure that the compressors are never full + targetOutput_huge = uint64(100_000_000_000) + // this target size was determiend by the devnet sepolia batcher's configuration + targetOuput_real = uint64(780120) +) +var ( // compressors used in the benchmark rc, _ = compressor.NewRatioCompressor(compressor.Config{ - TargetOutputSize: 100_000_000_000, + TargetOutputSize: targetOutput_huge, ApproxComprRatio: 0.4, }) sc, _ = compressor.NewShadowCompressor(compressor.Config{ - TargetOutputSize: 100_000_000_000, + TargetOutputSize: targetOutput_huge, }) nc, _ = compressor.NewNonCompressor(compressor.Config{ - TargetOutputSize: 100_000_000_000, + TargetOutputSize: targetOutput_huge, }) - bc, _ = compressor.NewBlindCompressor(compressor.Config{ - TargetOutputSize: 100_000_000_000, + realsc, _ = compressor.NewShadowCompressor(compressor.Config{ + TargetOutputSize: targetOuput_real, }) - compressors = map[string]derive.Compressor{ - "BlindCompressor": bc, - "NonCompressor": nc, - "RatioCompressor": rc, - "ShadowCompressor": sc, + // compressors used in the benchmark mapped by their name + // they come paired with a target output size so span batches can use the target size directly + compressors = map[string]compressorAndTarget{ + "NonCompressor": {nc, targetOutput_huge}, + "RatioCompressor": {rc, targetOutput_huge}, + "ShadowCompressor": {sc, targetOutput_huge}, + "RealShadowCompressor": {realsc, targetOuput_real}, } - // batch types used in the benchmark batchTypes = []uint{ derive.SpanBatchType, @@ -45,6 +52,23 @@ var ( } ) +type compressorAndTarget struct { + compressor derive.Compressor + targetOutput uint64 +} + +// channelOutByType returns a channel out of the given type as a helper for the benchmarks +func channelOutByType(batchType uint, compKey string) (derive.ChannelOut, error) { + chainID := big.NewInt(333) + if batchType == derive.SingularBatchType { + return derive.NewSingularChannelOut(compressors[compKey].compressor) + } + if batchType == derive.SpanBatchType { + return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput) + } + return nil, fmt.Errorf("unsupported batch type: %d", batchType) +} + // a test case for the benchmark controls the number of batches and transactions per batch, // as well as the batch type and compressor used type BatchingBenchmarkTC struct { @@ -110,24 +134,70 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { for bn := 0; bn < b.N; bn++ { // don't measure the setup time b.StopTimer() - compressors[tc.compKey].Reset() - spanBatch := derive.NewSpanBatch(uint64(0), chainID) - cout, _ := derive.NewChannelOut(tc.BatchType, compressors[tc.compKey], spanBatch) + compressors[tc.compKey].compressor.Reset() + cout, _ := channelOutByType(tc.BatchType, tc.compKey) // add all but the final batch to the channel out for i := 0; i < tc.BatchCount-1; i++ { - _, err := cout.AddSingularBatch(batches[i], 0) + err := cout.AddSingularBatch(batches[i], 0) require.NoError(b, err) } // measure the time to add the final batch b.StartTimer() // add the final batch to the channel out - _, err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0) + err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0) require.NoError(b, err) } }) } } +// BenchmarkIncremental fills a channel out incrementally with batches +// each increment is counted as its own benchmark +// Hint: use -benchtime=1x to run the benchmarks for a single iteration +// it is not currently designed to use b.N +func BenchmarkIncremental(b *testing.B) { + chainID := big.NewInt(333) + rng := rand.New(rand.NewSource(0x543331)) + // use the real compressor for this benchmark + // use batchCount as the number of batches to add in each benchmark iteration + // and use txPerBatch as the number of transactions per batch + tcs := []BatchingBenchmarkTC{ + {derive.SpanBatchType, 5, 1, "RealBlindCompressor"}, + //{derive.SingularBatchType, 100, 1, "RealShadowCompressor"}, + } + for _, tc := range tcs { + cout, err := channelOutByType(tc.BatchType, tc.compKey) + if err != nil { + b.Fatal(err) + } + done := false + for base := 0; !done; base += tc.BatchCount { + rangeName := fmt.Sprintf("Incremental %s: %d-%d", tc.String(), base, base+tc.BatchCount) + b.Run(rangeName, func(b *testing.B) { + b.StopTimer() + // prepare the batches + t := time.Now() + batches := make([]*derive.SingularBatch, tc.BatchCount) + for i := 0; i < tc.BatchCount; i++ { + t := t.Add(time.Second) + batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID) + // set the timestamp to increase with each batch + // to leverage optimizations in the Batch Linked List + batches[i].Timestamp = uint64(t.Unix()) + } + b.StartTimer() + for i := 0; i < tc.BatchCount; i++ { + err := cout.AddSingularBatch(batches[i], 0) + if err != nil { + done = true + return + } + } + }) + } + } +} + // BenchmarkAllBatchesChannelOut benchmarks the performance of adding singular batches to a channel out // this exercises the compression and batching logic, as well as any batch-building logic // Every Compressor in the compressor map is benchmarked for each test case @@ -173,13 +243,12 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { for bn := 0; bn < b.N; bn++ { // don't measure the setup time b.StopTimer() - compressors[tc.compKey].Reset() - spanBatch := derive.NewSpanBatch(0, chainID) - cout, _ := derive.NewChannelOut(tc.BatchType, compressors[tc.compKey], spanBatch) + compressors[tc.compKey].compressor.Reset() + cout, _ := channelOutByType(tc.BatchType, tc.compKey) b.StartTimer() // add all batches to the channel out for i := 0; i < tc.BatchCount; i++ { - _, err := cout.AddSingularBatch(batches[i], 0) + err := cout.AddSingularBatch(batches[i], 0) require.NoError(b, err) } } diff --git a/op-node/rollup/derive/channel_out.go b/op-node/rollup/derive/channel_out.go index 1d9ab6b0e5b7..ba707a16ec44 100644 --- a/op-node/rollup/derive/channel_out.go +++ b/op-node/rollup/derive/channel_out.go @@ -53,8 +53,8 @@ type Compressor interface { type ChannelOut interface { ID() ChannelID Reset() error - AddBlock(*rollup.Config, *types.Block) (uint64, error) - AddSingularBatch(*SingularBatch, uint64) (uint64, error) + AddBlock(*rollup.Config, *types.Block) error + AddSingularBatch(*SingularBatch, uint64) error InputBytes() int ReadyBytes() int Flush() error @@ -63,17 +63,6 @@ type ChannelOut interface { OutputFrame(*bytes.Buffer, uint64) (uint16, error) } -func NewChannelOut(batchType uint, compress Compressor, spanBatch *SpanBatch) (ChannelOut, error) { - switch batchType { - case SingularBatchType: - return NewSingularChannelOut(compress) - case SpanBatchType: - return NewSpanChannelOut(compress, spanBatch) - default: - return nil, fmt.Errorf("unrecognized batch type: %d", batchType) - } -} - type SingularChannelOut struct { id ChannelID // Frame ID of the next frame to emit. Increment after emitting @@ -119,14 +108,14 @@ func (co *SingularChannelOut) Reset() error { // and an error if there is a problem adding the block. The only sentinel error // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // should be closed and a new one should be made. -func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) { +func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error { if co.closed { - return 0, ErrChannelOutAlreadyClosed + return ErrChannelOutAlreadyClosed } batch, l1Info, err := BlockToSingularBatch(rollupCfg, block) if err != nil { - return 0, err + return err } return co.AddSingularBatch(batch, l1Info.SequenceNumber) } @@ -139,26 +128,26 @@ func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Bl // AddSingularBatch should be used together with BlockToBatch if you need to access the // BatchData before adding a block to the channel. It isn't possible to access // the batch data with AddBlock. -func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) (uint64, error) { +func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) error { if co.closed { - return 0, ErrChannelOutAlreadyClosed + return ErrChannelOutAlreadyClosed } // We encode to a temporary buffer to determine the encoded length to // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL var buf bytes.Buffer if err := rlp.Encode(&buf, NewBatchData(batch)); err != nil { - return 0, err + return err } if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel { - return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", + return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes) } co.rlpLength += buf.Len() // avoid using io.Copy here, because we need all or nothing - written, err := co.compress.Write(buf.Bytes()) - return uint64(written), err + _, err := co.compress.Write(buf.Bytes()) + return err } // InputBytes returns the total amount of RLP-encoded input bytes. diff --git a/op-node/rollup/derive/channel_out_test.go b/op-node/rollup/derive/channel_out_test.go index 7e6bc04cb06c..7c4ae92204d0 100644 --- a/op-node/rollup/derive/channel_out_test.go +++ b/op-node/rollup/derive/channel_out_test.go @@ -36,59 +36,94 @@ func (s *nonCompressor) FullErr() error { return nil } -func TestChannelOutAddBlock(t *testing.T) { - cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil) - require.NoError(t, err) +// channelTypes allows tests to run against different channel types +var channelTypes = []struct { + ChannelOut func(t *testing.T) ChannelOut + Name string +}{ + { + Name: "Singular", + ChannelOut: func(t *testing.T) ChannelOut { + cout, err := NewSingularChannelOut(&nonCompressor{}) + require.NoError(t, err) + return cout + }, + }, + { + Name: "Span", + ChannelOut: func(t *testing.T) ChannelOut { + cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000) + require.NoError(t, err) + return cout + }, + }, +} - t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) { - header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} - block := types.NewBlockWithHeader(header).WithBody( - []*types.Transaction{ - types.NewTx(&types.DynamicFeeTx{}), - }, - nil, - ) - _, err := cout.AddBlock(&rollupCfg, block) - require.Error(t, err) - require.Equal(t, ErrNotDepositTx, err) - }) +func TestChannelOutAddBlock(t *testing.T) { + for _, tcase := range channelTypes { + t.Run(tcase.Name, func(t *testing.T) { + cout := tcase.ChannelOut(t) + header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(header).WithBody( + []*types.Transaction{ + types.NewTx(&types.DynamicFeeTx{}), + }, + nil, + ) + err := cout.AddBlock(&rollupCfg, block) + require.Error(t, err) + require.Equal(t, ErrNotDepositTx, err) + }) + } } // TestOutputFrameSmallMaxSize tests that calling [OutputFrame] with a small -// max size that is below the fixed frame size overhead of 23, will return -// an error. +// max size that is below the fixed frame size overhead of FrameV0OverHeadSize (23), +// will return an error. func TestOutputFrameSmallMaxSize(t *testing.T) { - cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil) - require.NoError(t, err) - - // Call OutputFrame with the range of small max size values that err - var w bytes.Buffer - for i := 0; i < 23; i++ { - fid, err := cout.OutputFrame(&w, uint64(i)) - require.ErrorIs(t, err, ErrMaxFrameSizeTooSmall) - require.Zero(t, fid) + for _, tcase := range channelTypes { + t.Run(tcase.Name, func(t *testing.T) { + cout := tcase.ChannelOut(t) + // Call OutputFrame with the range of small max size values that err + var w bytes.Buffer + for i := 0; i < FrameV0OverHeadSize; i++ { + fid, err := cout.OutputFrame(&w, uint64(i)) + require.ErrorIs(t, err, ErrMaxFrameSizeTooSmall) + require.Zero(t, fid) + } + }) } } func TestOutputFrameNoEmptyLastFrame(t *testing.T) { - cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil) - require.NoError(t, err) + for _, tcase := range channelTypes { + t.Run(tcase.Name, func(t *testing.T) { + cout := tcase.ChannelOut(t) - rng := rand.New(rand.NewSource(0x543331)) - chainID := big.NewInt(rng.Int63n(1000)) - txCount := 1 - singularBatch := RandomSingularBatch(rng, txCount, chainID) + rng := rand.New(rand.NewSource(0x543331)) + chainID := big.NewInt(0) + txCount := 1 + singularBatch := RandomSingularBatch(rng, txCount, chainID) - written, err := cout.AddSingularBatch(singularBatch, 0) - require.NoError(t, err) + err := cout.AddSingularBatch(singularBatch, 0) + var written uint64 + require.NoError(t, err) - require.NoError(t, cout.Close()) + require.NoError(t, cout.Close()) - var buf bytes.Buffer - // Output a frame which needs exactly `written` bytes. This frame is expected to be the last frame. - _, err = cout.OutputFrame(&buf, written+FrameV0OverHeadSize) - require.ErrorIs(t, err, io.EOF) + // depending on the channel type, determine the size of the written data + if span, ok := cout.(*SpanChannelOut); ok { + written = uint64(span.compressed.Len()) + } else if singular, ok := cout.(*SingularChannelOut); ok { + written = uint64(singular.compress.Len()) + } + var buf bytes.Buffer + // Output a frame which needs exactly `written` bytes. This frame is expected to be the last frame. + _, err = cout.OutputFrame(&buf, written+FrameV0OverHeadSize) + require.ErrorIs(t, err, io.EOF) + }) + } } // TestRLPByteLimit ensures that stream encoder is properly limiting the length. @@ -184,3 +219,82 @@ func TestBlockToBatchValidity(t *testing.T) { _, _, err := BlockToSingularBatch(&rollupCfg, block) require.ErrorContains(t, err, "has no transactions") } + +func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOut, []*SingularBatch) { + // target is larger than one batch, but smaller than two batches + rng := rand.New(rand.NewSource(0x543331)) + chainID := big.NewInt(rng.Int63n(1000)) + txCount := 1 + cout, err := NewSpanChannelOut(0, chainID, target) + require.NoError(t, err) + batches := make([]*SingularBatch, len) + // adding the first batch should not cause an error + for i := 0; i < len; i++ { + singularBatch := RandomSingularBatch(rng, txCount, chainID) + batches[i] = singularBatch + } + + return cout, batches +} + +// TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch +// and it is larger than the target size. The single batch should be compressed, and the channel should now be full +func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) { + cout, singularBatches := SpanChannelAndBatches(t, 300, 2) + + err := cout.AddSingularBatch(singularBatches[0], 0) + // confirm compression was not skipped + require.Greater(t, cout.compressed.Len(), 0) + require.NoError(t, err) + + // confirm the channel is full + require.ErrorIs(t, cout.FullErr(), ErrCompressorFull) + + // confirm adding another batch would cause the same full error + err = cout.AddSingularBatch(singularBatches[1], 0) + require.ErrorIs(t, err, ErrCompressorFull) +} + +// TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull +func TestSpanChannelOutCompressionUndo(t *testing.T) { + // target is larger than one batch, but smaller than two batches + cout, singularBatches := SpanChannelAndBatches(t, 750, 2) + + err := cout.AddSingularBatch(singularBatches[0], 0) + require.NoError(t, err) + // confirm that the first compression was skipped + require.Equal(t, 0, cout.compressed.Len()) + // record the RLP length to confirm it doesn't change when adding a rejected batch + rlp1 := cout.activeRLP().Len() + + err = cout.AddSingularBatch(singularBatches[1], 0) + require.ErrorIs(t, err, ErrCompressorFull) + // confirm that the second compression was not skipped + require.Greater(t, cout.compressed.Len(), 0) + + // confirm that the second rlp is tht same size as the first (because the second batch was not added) + require.Equal(t, rlp1, cout.activeRLP().Len()) +} + +// TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed. +// it should compress the batch even if it is smaller than the target size because the channel is closing +func TestSpanChannelOutClose(t *testing.T) { + target := uint64(600) + cout, singularBatches := SpanChannelAndBatches(t, target, 1) + + err := cout.AddSingularBatch(singularBatches[0], 0) + require.NoError(t, err) + // confirm no compression has happened yet + require.Equal(t, 0, cout.compressed.Len()) + + // confirm the RLP length is less than the target + rlpLen := cout.activeRLP().Len() + require.Less(t, uint64(rlpLen), target) + + // close the channel + require.NoError(t, cout.Close()) + + // confirm that the only batch was compressed, and that the RLP did not change + require.Greater(t, cout.compressed.Len(), 0) + require.Equal(t, rlpLen, cout.activeRLP().Len()) +} diff --git a/op-node/rollup/derive/span_channel_out.go b/op-node/rollup/derive/span_channel_out.go index 11030d70790a..e549e862e6bf 100644 --- a/op-node/rollup/derive/span_channel_out.go +++ b/op-node/rollup/derive/span_channel_out.go @@ -2,9 +2,11 @@ package derive import ( "bytes" + "compress/zlib" "crypto/rand" "fmt" "io" + "math/big" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" @@ -16,176 +18,220 @@ type SpanChannelOut struct { id ChannelID // Frame ID of the next frame to emit. Increment after emitting frame uint64 - // rlpLength is the uncompressed size of the channel. Must be less than MAX_RLP_BYTES_PER_CHANNEL - rlpLength int - - // Compressor stage. Write input data to it - compress Compressor + // rlp is the encoded, uncompressed data of the channel. length must be less than MAX_RLP_BYTES_PER_CHANNEL + // it is a double buffer to allow us to "undo" the last change to the RLP structure when the target size is exceeded + rlp [2]*bytes.Buffer + // rlpIndex is the index of the current rlp buffer + rlpIndex int + // lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed + // it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression + lastCompressedRLPSize int + // compressed contains compressed data for making output frames + compressed *bytes.Buffer + // compress is the zlib writer for the channel + compressor *zlib.Writer + // target is the target size of the compressed data + target uint64 // closed indicates if the channel is closed closed bool - // spanBatch is the batch being built + // full indicates if the channel is full + full error + // spanBatch is the batch being built, which immutably holds genesis timestamp and chain ID, but otherwise can be reset spanBatch *SpanBatch - // reader contains compressed data for making output frames - reader *bytes.Buffer } func (co *SpanChannelOut) ID() ChannelID { return co.id } -func NewSpanChannelOut(compress Compressor, spanBatch *SpanBatch) (*SpanChannelOut, error) { +func (co *SpanChannelOut) setRandomID() error { + _, err := rand.Read(co.id[:]) + return err +} + +func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64) (*SpanChannelOut, error) { c := &SpanChannelOut{ - id: ChannelID{}, - frame: 0, - rlpLength: 0, - compress: compress, - spanBatch: spanBatch, - reader: &bytes.Buffer{}, - } - _, err := rand.Read(c.id[:]) - if err != nil { + id: ChannelID{}, + frame: 0, + spanBatch: NewSpanBatch(genesisTimestamp, chainID), + rlp: [2]*bytes.Buffer{{}, {}}, + compressed: &bytes.Buffer{}, + target: targetOutputSize, + } + var err error + if err = c.setRandomID(); err != nil { + return nil, err + } + if c.compressor, err = zlib.NewWriterLevel(c.compressed, zlib.BestCompression); err != nil { return nil, err } - return c, nil } func (co *SpanChannelOut) Reset() error { - co.frame = 0 - co.rlpLength = 0 - co.compress.Reset() - co.reader.Reset() co.closed = false + co.full = nil + co.frame = 0 + co.rlp[0].Reset() + co.rlp[1].Reset() + co.lastCompressedRLPSize = 0 + co.compressed.Reset() + co.compressor.Reset(co.compressed) co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID) - _, err := rand.Read(co.id[:]) - return err + // setting the new randomID is the only part of the reset that can fail + return co.setRandomID() } -// AddBlock adds a block to the channel. It returns the RLP encoded byte size -// and an error if there is a problem adding the block. The only sentinel error +// activeRLP returns the active RLP buffer using the current rlpIndex +func (co *SpanChannelOut) activeRLP() *bytes.Buffer { + return co.rlp[co.rlpIndex] +} + +// inactiveRLP returns the inactive RLP buffer using the current rlpIndex +func (co *SpanChannelOut) inactiveRLP() *bytes.Buffer { + return co.rlp[(co.rlpIndex+1)%2] +} + +// swapRLP switches the active and inactive RLP buffers by modifying the rlpIndex +func (co *SpanChannelOut) swapRLP() { + co.rlpIndex = (co.rlpIndex + 1) % 2 +} + +// AddBlock adds a block to the channel. +// returns an error if there is a problem adding the block. The only sentinel error // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // should be closed and a new one should be made. -func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) { +func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error { if co.closed { - return 0, ErrChannelOutAlreadyClosed + return ErrChannelOutAlreadyClosed } batch, l1Info, err := BlockToSingularBatch(rollupCfg, block) if err != nil { - return 0, err + return err } return co.AddSingularBatch(batch, l1Info.SequenceNumber) } -// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size -// and an error if there is a problem adding the batch. The only sentinel error -// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel -// should be closed and a new one should be made. -// -// AddSingularBatch should be used together with BlockToSingularBatch if you need to access the -// BatchData before adding a block to the channel. It isn't possible to access -// the batch data with AddBlock. -// -// SingularBatch is appended to the channel's SpanBatch. -// A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed. -// So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed. -// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch. -func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) (uint64, error) { +// AddSingularBatch adds a SingularBatch to the channel, compressing the data if necessary. +// if the new batch would make the channel exceed the target size, the last batch is reverted, +// and the compression happens on the previous RLP buffer instead +// if the input is too small to need compression, data is accumulated but not compressed +func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) error { + // sentinel error for closed or full channel if co.closed { - return 0, ErrChannelOutAlreadyClosed + return ErrChannelOutAlreadyClosed } - if co.FullErr() != nil { - // channel is already full - return 0, co.FullErr() + if err := co.FullErr(); err != nil { + return err } - var buf bytes.Buffer - // Append Singular batch to its span batch builder + + // update the SpanBatch with the SingularBatch if err := co.spanBatch.AppendSingularBatch(batch, seqNum); err != nil { - return 0, fmt.Errorf("failed to append SingularBatch to SpanBatch: %w", err) + return fmt.Errorf("failed to append SingularBatch to SpanBatch: %w", err) } - // Convert Span batch to RawSpanBatch + // convert Span batch to RawSpanBatch rawSpanBatch, err := co.spanBatch.ToRawSpanBatch() if err != nil { - return 0, fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err) - } - // Encode RawSpanBatch into bytes - if err = rlp.Encode(&buf, NewBatchData(rawSpanBatch)); err != nil { - return 0, fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err) - } - // Ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL - if buf.Len() > MaxRLPBytesPerChannel { - return 0, fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w", - buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes) - } - co.rlpLength = buf.Len() - - // If the channel is full after this block is appended, we should use preserved data. - // so copy the compressed data to reader - if len(co.spanBatch.Batches) > 1 { - _, err = io.Copy(co.reader, co.compress) - if err != nil { - // Must reset reader to avoid partial output - co.reader.Reset() - return 0, fmt.Errorf("failed to copy compressed data to reader: %w", err) - } + return fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err) + } + + // switch to the other buffer and reset it for new use + // (the RLP buffer which is being made inactive holds the RLP encoded span batch just before the new batch was added) + co.swapRLP() + co.activeRLP().Reset() + if err = rlp.Encode(co.activeRLP(), NewBatchData(rawSpanBatch)); err != nil { + return fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err) + } + + // check the RLP length against the max + if co.activeRLP().Len() > MaxRLPBytesPerChannel { + return fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w", + co.activeRLP().Len(), co.inactiveRLP().Len(), MaxRLPBytesPerChannel, ErrTooManyRLPBytes) + } + + // if the compressed data *plus* the new rlp data is under the target size, return early + // this optimizes out cases where the compressor will obviously come in under the target size + rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize + if uint64(co.compressed.Len()+rlpGrowth) < co.target { + return nil } - // Reset compressor to rewrite the entire span batch - co.compress.Reset() - // Avoid using io.Copy here, because we need all or nothing - written, err := co.compress.Write(buf.Bytes()) - // Always flush (for BlindCompressor to check if it's full) - if err := co.compress.Flush(); err != nil { - return 0, fmt.Errorf("failed to flush compressor: %w", err) + // we must compress the data to check if we've met or exceeded the target size + if err = co.compress(); err != nil { + return err } - if co.compress.FullErr() != nil { - err = co.compress.FullErr() + co.lastCompressedRLPSize = co.activeRLP().Len() + + // if the channel is now full, either return the compressed data, or the compressed previous data + if err := co.FullErr(); err != nil { + // if there is only one batch in the channel, it *must* be returned if len(co.spanBatch.Batches) == 1 { - // Do not return ErrCompressorFull for the first block in the batch - // In this case, reader must be empty. then the contents of compressor will be copied to reader when the channel is closed. - err = nil + return nil + } + + // if there is more than one batch in the channel, we revert the last batch + // by switching the RLP buffer and doing a fresh compression + co.swapRLP() + if err := co.compress(); err != nil { + return err } - // If there are more than one blocks in the channel, reader should have data that preserves previous compression result before adding this block. - // So, as a result, this block is not added to the channel and the channel will be closed. - return uint64(written), err + // return the full error + return err } - // If compressor is not full yet, reader must be reset to avoid submitting invalid frames - co.reader.Reset() - return uint64(written), err + return nil +} + +// compress compresses the active RLP buffer and checks if the compressed data is over the target size. +// it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally. +func (co *SpanChannelOut) compress() error { + co.compressed.Reset() + co.compressor.Reset(co.compressed) + if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil { + return err + } + if err := co.compressor.Close(); err != nil { + return err + } + co.checkFull() + return nil } // InputBytes returns the total amount of RLP-encoded input bytes. func (co *SpanChannelOut) InputBytes() int { - return co.rlpLength + return co.activeRLP().Len() } -// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. -// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes -// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage. +// ReadyBytes returns the total amount of compressed bytes that are ready to be output. +// Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full func (co *SpanChannelOut) ReadyBytes() int { - return co.reader.Len() + if co.closed || co.FullErr() != nil { + return co.compressed.Len() + } + return 0 } -// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more -// complete frame. It reduces the compression efficiency. +// Flush implements the Channel Out +// Span Channel Out manages the flushing of the compressor internally, so this is a no-op func (co *SpanChannelOut) Flush() error { - if err := co.compress.Flush(); err != nil { - return err + return nil +} + +// checkFull sets the full error if the compressed data is over the target size. +// the error is only set once, and the channel is considered full from that point on +func (co *SpanChannelOut) checkFull() { + // if the channel is already full, don't update further + if co.full != nil { + return } - if co.closed && co.ReadyBytes() == 0 && co.compress.Len() > 0 { - _, err := io.Copy(co.reader, co.compress) - if err != nil { - // Must reset reader to avoid partial output - co.reader.Reset() - return fmt.Errorf("failed to flush compressed data to reader: %w", err) - } + if uint64(co.compressed.Len()) >= co.target { + co.full = ErrCompressorFull } - return nil } func (co *SpanChannelOut) FullErr() error { - return co.compress.FullErr() + return co.full } func (co *SpanChannelOut) Close() error { @@ -193,10 +239,14 @@ func (co *SpanChannelOut) Close() error { return ErrChannelOutAlreadyClosed } co.closed = true - if err := co.Flush(); err != nil { - return err + // if the channel was already full, + // the compressor is already flushed and closed + if co.FullErr() != nil { + return nil } - return co.compress.Close() + // if this channel is not full, we need to compress the last batch + // this also flushes/closes the compressor + return co.compress() } // OutputFrame writes a frame to w with a given max size and returns the frame @@ -214,7 +264,7 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize) - if _, err := io.ReadFull(co.reader, f.Data); err != nil { + if _, err := io.ReadFull(co.compressed, f.Data); err != nil { return 0, err }