Skip to content

Commit

Permalink
fix: Send drop events from blobpool.
Browse files Browse the repository at this point in the history
  • Loading branch information
tyler-smith committed Jun 10, 2024
1 parent b39a270 commit 6d78f52
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 29 deletions.
66 changes: 66 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.Addres
p.Close()
return err
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropUnexecutable,
})
}
}
// Sort the indexed transactions by nonce and delete anything gapped, create
Expand Down Expand Up @@ -566,6 +571,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropUnexecutable,
})
}
return
}
Expand Down Expand Up @@ -597,6 +607,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropLowNonce,
})
}
p.index[addr] = txs
}
Expand Down Expand Up @@ -646,6 +661,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
txs = append(txs[:i], txs[i+1:]...)
p.index[addr] = txs

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropReplaced,
})

i--
continue
}
Expand All @@ -671,6 +691,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropUnexecutable,
})
}
p.index[addr] = txs
break
Expand Down Expand Up @@ -717,6 +742,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropAccountCap,
})
}
}
// Sanity check that no account can have more queued transactions than the
Expand Down Expand Up @@ -749,6 +779,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropAccountCap,
})
}
}
// Included cheap transactions might have left the remaining ones better from
Expand Down Expand Up @@ -1075,6 +1110,11 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete dropped transaction", "id", id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobIDToTransaction(id)},
Reason: txpool.DropGasPriceUpdated,
})
}
break
}
Expand Down Expand Up @@ -1343,6 +1383,12 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
delete(p.lookup, prev.hash)
p.lookup[meta.hash] = meta.id
p.stored += uint64(meta.size) - uint64(prev.size)

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobTxMetaToTransaction(prev)},
Reason: txpool.DropReplaced,
Replacement: tx,
})
} else {
// Transaction extends previously scheduled ones
p.index[from] = append(p.index[from], meta)
Expand Down Expand Up @@ -1464,6 +1510,11 @@ func (p *BlobPool) drop() {
if err := p.store.Delete(drop.id); err != nil {
log.Error("Failed to drop evicted transaction", "id", drop.id, "err", err)
}

p.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{blobTxMetaToTransaction(&blobTxMeta{id: drop.id})},
Reason: txpool.DropTruncating,
})
}

// Pending retrieves all currently processable transactions, grouped by origin
Expand Down Expand Up @@ -1712,3 +1763,18 @@ func (pool *BlobPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.S
func (pool *BlobPool) SubscribeRejectedTxEvent(ch chan<- core.RejectedTxEvent) event.Subscription {
return pool.rejectTxFeed.Subscribe(ch)
}

func blobIDToTransaction(id uint64) *types.Transaction {
return blobTxMetaToTransaction(&blobTxMeta{id: id})
}

func blobTxMetaToTransaction(meta *blobTxMeta) *types.Transaction {
return types.NewTx(&types.BlobTx{
Gas: meta.execGas,
BlobFeeCap: meta.blobFeeCap,
Nonce: meta.nonce,
GasFeeCap: meta.execFeeCap,
GasTipCap: meta.execTipCap,
BlobHashes: meta.blobHashes,
})
}
14 changes: 14 additions & 0 deletions core/txpool/drop_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package txpool

const (
DropUnderpriced = "underpriced-txs"
DropLowNonce = "low-nonce-txs"
DropUnpayable = "unpayable-txs"

DropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
DropReplaced = "replaced-txs"
DropUnexecutable = "unexecutable-txs"
DropTruncating = "truncating-txs"
DropOld = "old-txs"
DropGasPriceUpdated = "updated-gas-price"
)
44 changes: 15 additions & 29 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (pool *LegacyPool) loop() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: list,
Reason: dropOld,
Reason: txpool.DropOld,
})
queuedEvictionMeter.Mark(int64(len(list)))
}
Expand Down Expand Up @@ -468,7 +468,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) {
pool.priced.Removed(len(drop))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drop,
Reason: dropGasPriceUpdated,
Reason: txpool.DropGasPriceUpdated,
})
}
log.Info("Legacy pool tip threshold updated", "tip", newTip)
Expand Down Expand Up @@ -794,7 +794,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.changesSinceReorg += dropped
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drop,
Reason: dropUnderpriced,
Reason: txpool.DropUnderpriced,
})
}
}
Expand All @@ -814,7 +814,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pendingReplaceMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
Reason: txpool.DropReplaced,
Replacement: tx,
})
}
Expand Down Expand Up @@ -894,7 +894,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
queuedReplaceMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
Reason: txpool.DropReplaced,
})
} else {
// Nothing was replaced, bump the queued counter
Expand Down Expand Up @@ -954,7 +954,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
pendingReplaceMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
Reason: txpool.DropReplaced,
})
} else {
// Nothing was replaced, bump the pending counter
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
pendingGauge.Dec(int64(1 + len(invalids)))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: invalids,
Reason: dropUnexecutable,
Reason: txpool.DropUnexecutable,
})
return 1 + len(invalids)
}
Expand Down Expand Up @@ -1509,7 +1509,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
log.Trace("Removed old queued transactions", "count", len(forwards))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: forwards,
Reason: dropLowNonce,
Reason: txpool.DropLowNonce,
})
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
Expand All @@ -1521,7 +1521,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
queuedNofundsMeter.Mark(int64(len(drops)))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drops,
Reason: dropUnpayable,
Reason: txpool.DropUnpayable,
})

// Gather all executable transactions and promote them
Expand All @@ -1547,7 +1547,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
queuedRateLimitMeter.Mark(int64(len(caps)))
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: caps,
Reason: dropAccountCap,
Reason: txpool.DropAccountCap,
})
}
// Mark all the items dropped as removed
Expand Down Expand Up @@ -1618,7 +1618,7 @@ func (pool *LegacyPool) truncatePending() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: caps,
Reason: dropAccountCap,
Reason: txpool.DropAccountCap,
})
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
Expand Down Expand Up @@ -1693,7 +1693,7 @@ func (pool *LegacyPool) truncateQueue() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: txs,
Reason: dropTruncating,
Reason: txpool.DropTruncating,
})
drop -= size
queuedRateLimitMeter.Mark(int64(size))
Expand All @@ -1707,7 +1707,7 @@ func (pool *LegacyPool) truncateQueue() {
queuedRateLimitMeter.Mark(1)
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: []*types.Transaction{txs[i]},
Reason: dropTruncating,
Reason: txpool.DropTruncating,
})
}
}
Expand Down Expand Up @@ -1735,7 +1735,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: olds,
Reason: dropLowNonce,
Reason: txpool.DropLowNonce,
})
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
Expand All @@ -1746,7 +1746,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
pool.dropTxFeed.Send(core.DropTxsEvent{
Txs: drops,
Reason: dropUnpayable,
Reason: txpool.DropUnpayable,
})
pool.priced.Removed(len(olds) + len(drops))
pendingNofundsMeter.Mark(int64(len(drops)))
Expand Down Expand Up @@ -2043,17 +2043,3 @@ func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
}


const (
dropUnderpriced = "underpriced-txs"
dropLowNonce = "low-nonce-txs"
dropUnpayable = "unpayable-txs"

dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
dropReplaced = "replaced-txs"
dropUnexecutable = "unexecutable-txs"
dropTruncating = "truncating-txs"
dropOld = "old-txs"
dropGasPriceUpdated = "updated-gas-price"
)

0 comments on commit 6d78f52

Please sign in to comment.