Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified writer interface for Data Availability providers [NIT-1249] #2157

Merged
merged 18 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ import (
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
Expand Down Expand Up @@ -98,7 +98,7 @@ type BatchPoster struct {
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dapWriter daprovider.Writer
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
Expand Down Expand Up @@ -129,7 +129,7 @@ const (

type BatchPosterConfig struct {
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
DisableDapFallbackStoreDataOnChain bool `koanf:"disable-dap-fallback-store-data-on-chain" reload:"hot"`
// Max batch size.
MaxSize int `koanf:"max-size" reload:"hot"`
// Maximum 4844 blob enabled batch size.
Expand Down Expand Up @@ -189,7 +189,7 @@ type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Bool(prefix+".disable-dap-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDapFallbackStoreDataOnChain, "If unable to batch to DA provider, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxSize, "maximum batch size")
f.Int(prefix+".max-4844-batch-size", DefaultBatchPosterConfig.Max4844BatchSize, "maximum 4844 blob enabled batch size")
f.Duration(prefix+".max-delay", DefaultBatchPosterConfig.MaxDelay, "maximum batch posting delay")
Expand All @@ -214,7 +214,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {

var DefaultBatchPosterConfig = BatchPosterConfig{
Enable: false,
DisableDasFallbackStoreDataOnChain: false,
DisableDapFallbackStoreDataOnChain: false,
// This default is overridden for L3 chains in applyChainParameters in cmd/nitro/nitro.go
MaxSize: 100000,
Max4844BatchSize: blobs.BlobEncodableData*(params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob) - 2000,
Expand Down Expand Up @@ -277,7 +277,7 @@ type BatchPosterOpts struct {
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
DAPWriter daprovider.Writer
ParentChainID *big.Int
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
dapWriter: opts.DAPWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -883,7 +883,7 @@ func (s *batchSegments) CloseAndGetBytes() ([]byte, error) {
}
compressedBytes := s.compressedBuffer.Bytes()
fullMsg := make([]byte, 1, len(compressedBytes)+1)
fullMsg[0] = arbstate.BrotliMessageHeaderByte
fullMsg[0] = daprovider.BrotliMessageHeaderByte
fullMsg = append(fullMsg, compressedBytes...)
return fullMsg, nil
}
Expand Down Expand Up @@ -1063,7 +1063,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
var use4844 bool
config := b.config()
if config.Post4844Blobs && b.daWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
if config.Post4844Blobs && b.dapWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageNumber(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1)))
if err != nil {
return false, err
Expand Down Expand Up @@ -1246,7 +1246,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
return false, nil
}

if b.daWriter != nil {
if b.dapWriter != nil {
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}
Expand All @@ -1258,17 +1258,9 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}

cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}, config.DisableDapFallbackStoreDataOnChain)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unbalanced that daprovider.Writer is used only to write to the DAS and daprovider.Reader is used to read both DAS and blobs. I'm not sure how you'd structure it to be symmetric, though, since on one hand posting a batch to the DAS is a separate operation to posting the batch itself to L1, and with blobs both get posted together with the L1 tx.

Additionally for blob batch posting there is some unique logic around deciding whether or not to post as a 4844 tx or not #2158 which is different to the logic around whether to post a DAS batch or not (falls back on error only), so that also would make it hard to unify.

This is just an observation, we should seek more feedback about this from others on the team.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I could not find a way to wrap blob posting to follow a common writer interface that other DAs implement.

if err != nil {
return false, err
} else {
sequencerMsg = das.Serialize(cert)
}
}

Expand Down
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, nil)
tracker, err := NewInboxTracker(db, streamer, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
22 changes: 5 additions & 17 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
Expand All @@ -37,23 +38,17 @@ type InboxTracker struct {
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
blobReader arbstate.BlobReader
dapReaders []daprovider.Reader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
}
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
das: das,
blobReader: blobReader,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
Expand Down Expand Up @@ -659,14 +654,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
var daProviders []arbstate.DataAvailabilityProvider
if t.das != nil {
daProviders = append(daProviders, arbstate.NewDAProviderDAS(t.das))
}
if t.blobReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader))
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.dapReaders, daprovider.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
30 changes: 22 additions & 8 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcastclients"
Expand Down Expand Up @@ -254,7 +254,7 @@ type Node struct {
L1Reader *headerreader.HeaderReader
TxStreamer *TransactionStreamer
DeployInfo *chaininfo.RollupAddresses
BlobReader arbstate.BlobReader
BlobReader daprovider.BlobReader
InboxReader *InboxReader
InboxTracker *InboxTracker
DelayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -372,7 +372,7 @@ func createNodeImpl(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
config := configFetcher.Get()

Expand Down Expand Up @@ -529,7 +529,18 @@ func createNodeImpl(
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader)
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && daReader == nil {
return nil, errors.New("data availability service required but unconfigured")
}
var dapReaders []daprovider.Reader
if daReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForDAS(daReader))
}
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
}
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders)
if err != nil {
return nil, err
}
Expand All @@ -547,8 +558,7 @@ func createNodeImpl(
txStreamer,
exec,
rawdb.NewTable(arbDb, storage.BlockValidatorPrefix),
daReader,
blobReader,
dapReaders,
func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator },
stack,
)
Expand Down Expand Up @@ -655,6 +665,10 @@ func createNodeImpl(
if txOptsBatchPoster == nil && config.BatchPoster.DataPoster.ExternalSigner.URL == "" {
return nil, errors.New("batchposter, but no TxOpts")
}
var dapWriter daprovider.Writer
if daWriter != nil {
dapWriter = daprovider.NewWriterForDAS(daWriter)
}
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Expand All @@ -665,7 +679,7 @@ func createNodeImpl(
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
DAPWriter: dapWriter,
ParentChainID: parentChainID,
})
if err != nil {
Expand Down Expand Up @@ -725,7 +739,7 @@ func CreateNode(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
currentNode, err := createNodeImpl(ctx, stack, exec, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions arbnode/sequencer_inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"

"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand Down Expand Up @@ -159,7 +159,7 @@ func (m *SequencerInboxBatch) getSequencerData(ctx context.Context, client arbut
if len(tx.BlobHashes()) == 0 {
return nil, fmt.Errorf("blob batch transaction %v has no blobs", tx.Hash())
}
data := []byte{arbstate.BlobHashesHeaderFlag}
data := []byte{daprovider.BlobHashesHeaderFlag}
for _, h := range tx.BlobHashes() {
data = append(data, h[:]...)
}
Expand Down
104 changes: 104 additions & 0 deletions arbstate/daprovider/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021-2022, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package daprovider

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/blobs"
)

type Reader interface {
// IsValidHeaderByte returns true if the given headerByte has bits corresponding to the DA provider
IsValidHeaderByte(headerByte byte) bool

// RecoverPayloadFromBatch fetches the underlying payload from the DA provider given the batch header information
RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimageRecorder PreimageRecorder,
validateSeqMsg bool,
) ([]byte, error)
}

// NewReaderForDAS is generally meant to be only used by nitro.
// DA Providers should implement methods in the Reader interface independently
func NewReaderForDAS(dasReader DASReader) *readerForDAS {
return &readerForDAS{dasReader: dasReader}
}

type readerForDAS struct {
dasReader DASReader
}

func (d *readerForDAS) IsValidHeaderByte(headerByte byte) bool {
return IsDASMessageHeaderByte(headerByte)
}

func (d *readerForDAS) RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimageRecorder PreimageRecorder,
validateSeqMsg bool,
) ([]byte, error) {
return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.dasReader, preimageRecorder, validateSeqMsg)
}

// NewReaderForBlobReader is generally meant to be only used by nitro.
// DA Providers should implement methods in the Reader interface independently
func NewReaderForBlobReader(blobReader BlobReader) *readerForBlobReader {
return &readerForBlobReader{blobReader: blobReader}
}

type readerForBlobReader struct {
blobReader BlobReader
}

func (b *readerForBlobReader) IsValidHeaderByte(headerByte byte) bool {
return IsBlobHashesHeaderByte(headerByte)
}

func (b *readerForBlobReader) RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimageRecorder PreimageRecorder,
validateSeqMsg bool,
) ([]byte, error) {
blobHashes := sequencerMsg[41:]
if len(blobHashes)%len(common.Hash{}) != 0 {
return nil, ErrInvalidBlobDataFormat
}
versionedHashes := make([]common.Hash, len(blobHashes)/len(common.Hash{}))
for i := 0; i*32 < len(blobHashes); i += 1 {
copy(versionedHashes[i][:], blobHashes[i*32:(i+1)*32])
}
kzgBlobs, err := b.blobReader.GetBlobs(ctx, batchBlockHash, versionedHashes)
if err != nil {
return nil, fmt.Errorf("failed to get blobs: %w", err)
}
if preimageRecorder != nil {
for i, blob := range kzgBlobs {
// Prevent aliasing `blob` when slicing it, as for range loops overwrite the same variable
// Won't be necessary after Go 1.22 with https://go.dev/blog/loopvar-preview
b := blob
preimageRecorder(versionedHashes[i], b[:], arbutil.EthVersionedHashPreimageType)
}
}
payload, err := blobs.DecodeBlobs(kzgBlobs)
if err != nil {
log.Warn("Failed to decode blobs", "batchBlockHash", batchBlockHash, "versionedHashes", versionedHashes, "err", err)
return nil, nil
}
return payload, nil
}
Loading
Loading