Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 152 additions & 7 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ const (
// the pool will still accept and convert legacy blob transactions. After this
// window, all legacy blob transactions will be rejected.
conversionTimeWindow = time.Hour * 2

// gappedLifetime is the approximate duration for which nonce-gapped transactions
// are kept before being dropped. Since gapped is only a reorder buffer and it
// is expected that the original transactions were inserted in the mempool in
// nonce order, the duration is kept short to avoid DoS vectors.
gappedLifetime = 1 * time.Minute

// maxGappedTxs is the maximum number of gapped transactions kept overall.
// This is a safety limit to avoid DoS vectors.
maxGapped = 128
)

// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
Expand Down Expand Up @@ -337,6 +347,9 @@ type BlobPool struct {
stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs

gapped map[common.Address][]*gappedTx // Transactions that are currently gapped (nonce too high)
gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion

signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through
cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka)
Expand All @@ -356,6 +369,11 @@ type BlobPool struct {
lock sync.RWMutex // Mutex protecting the pool during reorg handling
}

type gappedTx struct {
tx *types.Transaction
timestamp time.Time
}

// New creates a new blob transaction pool to gather, sort and filter inbound
// blob transactions from the network.
func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bool) *BlobPool {
Expand All @@ -372,6 +390,8 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
gapped: make(map[common.Address][]*gappedTx),
gappedSource: make(map[common.Hash]common.Address),
}
}

Expand Down Expand Up @@ -841,6 +861,9 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
resettimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now())

// Handle reorg buffer timeouts evicting old gapped transactions
p.evictGapped()

statedb, err := p.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset blobpool state", "err", err)
Expand Down Expand Up @@ -1369,7 +1392,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error {
State: p.state,

FirstNonceGap: func(addr common.Address) uint64 {
// Nonce gaps are not permitted in the blob pool, the first gap will
// Nonce gaps are permitted in the blob pool, but only as part of the
// in-memory 'gapped' buffer. We expose the gap here to validateTx,
// then handle the error by adding to the buffer. The first gap will
// be the next nonce shifted by however many transactions we already
// have pooled.
return p.state.GetNonce(addr) + uint64(len(p.index[addr]))
Expand Down Expand Up @@ -1448,7 +1473,9 @@ func (p *BlobPool) Has(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()

return p.lookup.exists(hash)
poolHas := p.lookup.exists(hash)
_, gapped := p.gappedSource[hash]
return poolHas || gapped
}

func (p *BlobPool) getRLP(hash common.Hash) []byte {
Expand Down Expand Up @@ -1696,10 +1723,6 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
return errs
}

Expand All @@ -1718,6 +1741,13 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
addtimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now())

return p.addLocked(tx, true)
}

// addLocked inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restrictions). It must be called with the pool lock held.
// Only for internal use.
func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error) {
// Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
Expand All @@ -1730,6 +1760,19 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
addStaleMeter.Mark(1)
case errors.Is(err, core.ErrNonceTooHigh):
Copy link
Contributor

@healthykim healthykim Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about what form the gapped blobpool should take in the evolution of the blobpool. In my view, we have two possible options. The first is to store a transaction in the gapped set only when “nonce too high” is the sole rejection reason. The second is to store a transaction in the gapped set when “nonce too high” is one of the rejection reasons (so there can other reasons) This PR seems to be implementing the second option, and given the current state of the blobpool, I think this is reasonable.

If cells are delivered separately from transactions in the future, first option can be beneficial in some cases, in terms of bandwidth. If a transaction in the buffer cannot be included even after the gap is resolved, then by the time the client discover it needs to be discarded (when all cells are collected and revalidation occurs through the add routine below), it will already have consumed bandwidth to fetch its cells. The first design would help reduce this overhead.

However, I am not certain this first option is necessary when we have this gapped buffer, since bandwidth can easily be wasted if someone submits gapped transactions that pass other validations and never fill the gap. I am also not sure about how often this can happen and why the submitter would do such thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/ethereum/go-ethereum/compare/master...healthykim:go-ethereum:bs/cell-blobpool/sparse?expand=1

In my prototype, the buffer currently stores transactions that pass all validation checks but are simply missing their cells. Still need to do some debugging, but overall structure will remain like this
It also handles two replacement case, one for the replacement of buffered tx and the other for the replacement of pooled tx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've selected option 2 (there can be other reasons) here, since that's the option that is a pure reorder buffer, acting before any real blobpool logic. So, at least in theory, that's the least invasive version.

However, I can imagine moving to a more complex evaluation before storing the gapped transaction. The queue in the legacypool does a more complex evaluation. I think we should check that as well before deciding on future evolution.

addGappedMeter.Mark(1)
// Store the tx in memory, and revalidate later
from, _ := types.Sender(p.signer, tx)
allowance := p.gappedAllowance(from)
if allowance >= 1 && len(p.gapped) < maxGapped {
// if maxGapped is reached, it is better to give time to gapped
// transactions by keeping the old and dropping this one
p.gapped[from] = append(p.gapped[from], &gappedTx{tx: tx, timestamp: time.Now()})
p.gappedSource[tx.Hash()] = from
log.Trace("blobpool:add added to Gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
return nil
} else {
log.Trace("blobpool:add no Gapped blob queue allowance", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
}
case errors.Is(err, core.ErrInsufficientFunds):
addOverdraftedMeter.Mark(1)
case errors.Is(err, txpool.ErrAccountLimitExceeded):
Expand Down Expand Up @@ -1867,6 +1910,58 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
p.updateStorageMetrics()

addValidMeter.Mark(1)

// Notify all listeners of the new arrival
p.discoverFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}})
p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}})

//check the gapped queue for this account and try to promote
if gtxs, ok := p.gapped[from]; checkGapped && ok && len(gtxs) > 0 {
// We have to add in nonce order, but we want to stable sort to cater for situations
// where transactions are replaced, keeping the original receive order for same nonce
sort.SliceStable(gtxs, func(i, j int) bool {
return gtxs[i].tx.Nonce() < gtxs[j].tx.Nonce()
})
for len(gtxs) > 0 {
stateNonce := p.state.GetNonce(from)
firstgap := stateNonce + uint64(len(p.index[from]))

if gtxs[0].tx.Nonce() > firstgap {
// Anything beyond the first gap is not addable yet
break
}

// Drop any buffered transactions that became stale in the meantime (included in chain or replaced)
// If we arrive to the transaction in the pending range (between the state Nonce and first gap, we
// try to add them now while removing from here.
tx := gtxs[0].tx
gtxs[0] = nil
gtxs = gtxs[1:]
delete(p.gappedSource, tx.Hash())

if tx.Nonce() < stateNonce {
// Stale, drop it. Eventually we could add to limbo here if hash matches.
log.Trace("Gapped blob transaction became stale", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "state", stateNonce, "qlen", len(p.gapped[from]))
continue
}

if tx.Nonce() <= firstgap {
// If we hit the pending range, including the first gap, add it and continue to try to add more.
// We do not recurse here, but continue to loop instead.
// We are under lock, so we can add the transaction directly.
if err := p.addLocked(tx, false); err == nil {
log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
} else {
log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err)
}
}
}
if len(gtxs) == 0 {
delete(p.gapped, from)
} else {
p.gapped[from] = gtxs
}
}
return nil
}

Expand Down Expand Up @@ -2098,6 +2193,50 @@ func (p *BlobPool) Nonce(addr common.Address) uint64 {
return p.state.GetNonce(addr)
}

// gappedAllowance returns the number of gapped transactions still
// allowed for the given account. Allowance is based on a slow-start
// logic, allowing more gaps (resource usage) to accounts with a
// higher nonce. Can also return negative values.
func (p *BlobPool) gappedAllowance(addr common.Address) int {
// Gaps happen, but we don't want to allow too many.
// Use log10(nonce+1) as the allowance, with a minimum of 0.
nonce := p.state.GetNonce(addr)
allowance := int(math.Log10(float64(nonce + 1)))
// Cap the allowance to the remaining pool space
return min(allowance, maxTxsPerAccount-len(p.index[addr])) - len(p.gapped[addr])
}

// evictGapped removes the old transactions from the gapped reorder buffer.
// Concurrency: The caller must hold the pool lock before calling this function.
func (p *BlobPool) evictGapped() {
cutoff := time.Now().Add(-gappedLifetime)
for from, txs := range p.gapped {
nonce := p.state.GetNonce(from)
// Reuse the original slice to avoid extra allocations.
// This is safe because we only keep references to the original gappedTx objects,
// and we overwrite the slice for this account after filtering.
keep := txs[:0]
for i, gtx := range txs {
if gtx.timestamp.Before(cutoff) || gtx.tx.Nonce() < nonce {
// Evict old or stale transactions
// Should we add stale to limbo here if it would belong?
delete(p.gappedSource, gtx.tx.Hash())
txs[i] = nil // Explicitly nil out evicted element
} else {
keep = append(keep, gtx)
}
}
if len(keep) < len(txs) {
log.Trace("Evicting old gapped blob transactions", "count", len(txs)-len(keep), "from", from)
}
if len(keep) == 0 {
delete(p.gapped, from)
} else {
p.gapped[from] = keep
}
}
}

// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (p *BlobPool) Stats() (int, int) {
Expand Down Expand Up @@ -2132,9 +2271,15 @@ func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*ty
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
if p.Has(hash) {
p.lock.RLock()
defer p.lock.RUnlock()

if p.lookup.exists(hash) {
return txpool.TxStatusPending
}
if _, gapped := p.gappedSource[hash]; gapped {
return txpool.TxStatusQueued
}
return txpool.TxStatusUnknown
}

Expand Down
25 changes: 19 additions & 6 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ func TestAdd(t *testing.T) {
"bob": {balance: 21100 + blobSize, nonce: 1},
"claire": {balance: 21100 + blobSize},
"dave": {balance: 21100 + blobSize, nonce: 1},
"eve": {balance: 21100 + blobSize, nonce: 10}, // High nonce to test gapped acceptance
},
adds: []addtx{
{ // New account, no previous txs: accept nonce 0
Expand Down Expand Up @@ -1402,6 +1403,11 @@ func TestAdd(t *testing.T) {
tx: makeUnsignedTx(2, 1, 1, 1),
err: core.ErrNonceTooHigh,
},
{ // Old account, 10 txs in chain: 0 pending: accept nonce 11 as gapped
from: "eve",
tx: makeUnsignedTx(11, 1, 1, 1),
err: nil,
},
},
},
// Transactions from already pooled accounts should only be accepted if
Expand Down Expand Up @@ -1762,13 +1768,20 @@ func TestAdd(t *testing.T) {
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, errs[0], add.err)
}
if add.err == nil {
size, exist := pool.lookup.sizeOfTx(signed.Hash())
if !exist {
t.Errorf("test %d, tx %d: failed to lookup transaction's size", i, j)
// first check if tx is in the queue
if !pool.Has(signed.Hash()) {
t.Errorf("test %d, tx %d: added transaction not found in pool", i, j)
}
if size != signed.Size() {
t.Errorf("test %d, tx %d: transaction's size mismatches: have %v, want %v",
i, j, size, signed.Size())
// if it is pending, check if size matches
if pool.Status(signed.Hash()) == txpool.TxStatusPending {
size, exist := pool.lookup.sizeOfTx(signed.Hash())
if !exist {
t.Errorf("test %d, tx %d: failed to lookup transaction's size", i, j)
}
if size != signed.Size() {
t.Errorf("test %d, tx %d: transaction's size mismatches: have %v, want %v",
i, j, size, signed.Size())
}
}
}
verifyPoolInternals(t, pool)
Expand Down
Loading