Skip to content

Commit

Permalink
Implement span batch derivation
Browse files Browse the repository at this point in the history
  • Loading branch information
ImTei committed Sep 18, 2023
1 parent ba71953 commit 077a95c
Show file tree
Hide file tree
Showing 13 changed files with 2,066 additions and 192 deletions.
4 changes: 2 additions & 2 deletions op-node/cmd/batch_decoder/reassemble/reassemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ func processFrames(id derive.ChannelID, frames []FrameWithMetadata) ChannelWithM
var batches []derive.SingularBatch
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), eth.L1BlockRef{})
br, err := derive.BatchReader(ch.Reader())
if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() {
if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
batches = append(batches, batch.Batch.SingularBatch)
batches = append(batches, batch.SingularBatch)
}
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type AttributesQueue struct {
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
batch *BatchData
batch *SingularBatch
}

func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
Expand Down Expand Up @@ -71,7 +71,7 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2

// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *SingularBatch, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
Expand Down
6 changes: 3 additions & 3 deletions op-node/rollup/derive/attributes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestAttributesQueue(t *testing.T) {
safeHead.L1Origin = l1Info.ID()
safeHead.Time = l1Info.InfoTime

batch := NewSingularBatchData(SingularBatch{
batch := SingularBatch{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
EpochHash: l1Info.InfoHash,
Timestamp: safeHead.Time + cfg.BlockTime,
Transactions: []eth.Data{eth.Data("foobar"), eth.Data("example")},
})
}

parentL1Cfg := eth.SystemConfig{
BatcherAddr: common.Address{42},
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestAttributesQueue(t *testing.T) {

aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, attrBuilder, nil)

actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
actual, err := aq.createNextAttributes(context.Background(), &batch, safeHead)

require.NoError(t, err)
require.Equal(t, attrs, *actual)
Expand Down
137 changes: 91 additions & 46 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ import (

type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
NextBatch(ctx context.Context) (Batch, error)
}

type SafeBlockFetcher interface {
L2BlockRefByNumber(context.Context, uint64) (eth.L2BlockRef, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
}

// BatchQueue contains a set of batches for every L1 block.
Expand All @@ -42,24 +47,52 @@ type BatchQueue struct {

l1Blocks []eth.L1BlockRef

// batches in order of when we've first seen them, grouped by L2 timestamp
batches map[uint64][]*BatchWithL1InclusionBlock
// batches in order of when we've first seen them
batches []*BatchWithL1InclusionBlock

// nextSpan is cached SingularBatches derived from SpanBatch
nextSpan []*SingularBatch

l2 SafeBlockFetcher
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
prev: prev,
l2: l2,
}
}

func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
}

func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
func (bq *BatchQueue) popNextBatch(safeL2Head eth.L2BlockRef) *SingularBatch {
nextBatch := bq.nextSpan[0]
bq.nextSpan = bq.nextSpan[1:]
// Must set ParentHash before return. we can use safeL2Head because the parentCheck is verified in CheckBatch().
nextBatch.ParentHash = safeL2Head.Hash
return nextBatch
}

func (bq *BatchQueue) advanceEpoch(nextBatch *SingularBatch) {
if nextBatch.GetEpochNum() == rollup.Epoch(bq.l1Blocks[0].Number)+1 {
// Advance epoch if necessary
bq.l1Blocks = bq.l1Blocks[1:]
}
}

func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*SingularBatch, error) {
if len(bq.nextSpan) > 0 {
// If there are cached singular batches, pop first one and return.
nextBatch := bq.popNextBatch(safeL2Head)
bq.advanceEpoch(nextBatch)
return nextBatch, nil
}

// Note: We use the origin that we will have to determine if it's behind. This is important
// because it's the future origin that gets saved into the l1Blocks array.
// We always update the origin of this stage if it is not the same so after the update code
Expand Down Expand Up @@ -89,7 +122,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
} else if err != nil {
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
bq.AddBatch(ctx, batch, safeL2Head)
}

// Skip adding data unless we are up to date with the origin, but do fully
Expand All @@ -111,43 +144,70 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
} else if err != nil {
return nil, err
}
return batch, nil

var nextBatch *SingularBatch
switch batch.GetBatchType() {
case SingularBatchType:
singularBatch, ok := batch.(*SingularBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
nextBatch = singularBatch
case SpanBatchType:
spanBatch, ok := batch.(*SpanBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
// If next batch is SpanBatch, convert it to SingularBatches.
singularBatches, err := spanBatch.GetSingularBatches(bq.l1Blocks, safeL2Head)
if err != nil {
return nil, NewCriticalError(err)
}
bq.nextSpan = singularBatches
nextBatch = bq.popNextBatch(safeL2Head)
default:
return nil, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
}

bq.advanceEpoch(nextBatch)
return nextBatch, nil
}

func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
bq.batches = []*BatchWithL1InclusionBlock{}
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, base)
bq.nextSpan = bq.nextSpan[:0]
return io.EOF
}

func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
func (bq *BatchQueue) AddBatch(ctx context.Context, batch Batch, l2SafeHead eth.L2BlockRef) {
if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.GetTimestamp()))
}
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.origin,
Batch: batch,
}
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
validity := CheckBatch(ctx, bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data, bq.l2)
if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
}
bq.log.Debug("Adding batch", "batch_timestamp", batch.Timestamp, "parent_hash", batch.ParentHash, "batch_epoch", batch.Epoch(), "txs", len(batch.Transactions))
bq.batches[batch.Timestamp] = append(bq.batches[batch.Timestamp], &data)
batch.LogContext(bq.log).Debug("Adding batch")
bq.batches = append(bq.batches, &data)
}

// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (Batch, error) {
if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
}
Expand All @@ -170,19 +230,15 @@ func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2Saf
// Go over all batches, in order of inclusion, and find the first batch we can accept.
// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
var remaining []*BatchWithL1InclusionBlock
candidates := bq.batches[nextTimestamp]
batchLoop:
for i, batch := range candidates {
validity := CheckBatch(bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch)
for i, batch := range bq.batches {
validity := CheckBatch(ctx, bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch, bq.l2)
switch validity {
case BatchFuture:
return nil, NewCriticalError(fmt.Errorf("found batch with timestamp %d marked as future batch, but expected timestamp %d", batch.Batch.Timestamp, nextTimestamp))
remaining = append(remaining, batch)
continue
case BatchDrop:
bq.log.Warn("dropping batch",
"batch_timestamp", batch.Batch.Timestamp,
"parent_hash", batch.Batch.ParentHash,
"batch_epoch", batch.Batch.Epoch(),
"txs", len(batch.Batch.Transactions),
batch.Batch.LogContext(bq.log).Warn("dropping batch",
"l2_safe_head", l2SafeHead.ID(),
"l2_safe_head_time", l2SafeHead.Time,
)
Expand All @@ -191,29 +247,20 @@ batchLoop:
nextBatch = batch
// don't keep the current batch in the remaining items since we are processing it now,
// but retain every batch we didn't get to yet.
remaining = append(remaining, candidates[i+1:]...)
remaining = append(remaining, bq.batches[i+1:]...)
break batchLoop
case BatchUndecided:
remaining = append(remaining, batch)
bq.batches[nextTimestamp] = remaining
remaining = append(remaining, bq.batches[i:]...)
bq.batches = remaining
return nil, io.EOF
default:
return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
}
}
// clean up if we remove the final batch for this timestamp
if len(remaining) == 0 {
delete(bq.batches, nextTimestamp)
} else {
bq.batches[nextTimestamp] = remaining
}
bq.batches = remaining

if nextBatch != nil {
// advance epoch if necessary
if nextBatch.Batch.EpochNum == rollup.Epoch(epoch.Number)+1 {
bq.l1Blocks = bq.l1Blocks[1:]
}
bq.log.Info("Found next batch", "epoch", epoch, "batch_epoch", nextBatch.Batch.EpochNum, "batch_timestamp", nextBatch.Batch.Timestamp)
nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
return nextBatch.Batch, nil
}

Expand Down Expand Up @@ -243,15 +290,13 @@ batchLoop:
// batch to ensure that we at least have one batch per epoch.
if nextTimestamp < nextEpoch.Time || firstOfEpoch {
bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
return NewSingularBatchData(
SingularBatch{
ParentHash: l2SafeHead.Hash,
EpochNum: rollup.Epoch(epoch.Number),
EpochHash: epoch.Hash,
Timestamp: nextTimestamp,
Transactions: nil,
},
), nil
return &SingularBatch{
ParentHash: l2SafeHead.Hash,
EpochNum: rollup.Epoch(epoch.Number),
EpochHash: epoch.Hash,
Timestamp: nextTimestamp,
Transactions: nil,
}, nil
}

// At this point we have auto generated every batch for the current epoch
Expand Down
Loading

0 comments on commit 077a95c

Please sign in to comment.