Skip to content

Commit

Permalink
Add Flashbots bundles to miner
Browse files Browse the repository at this point in the history
  • Loading branch information
jparyani committed Feb 19, 2021
1 parent 8104d5d commit c2b5b40
Showing 1 changed file with 214 additions and 13 deletions.
227 changes: 214 additions & 13 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type environment struct {
uncles mapset.Set // uncle set
tcount int // tx count in cycle
gasPool *core.GasPool // available gas used to pack transactions
profit *big.Int

header *types.Header
txs []*types.Transaction
Expand All @@ -99,6 +100,8 @@ type task struct {
state *state.StateDB
block *types.Block
createdAt time.Time

profit *big.Int
}

const (
Expand Down Expand Up @@ -533,6 +536,9 @@ func (w *worker) taskLoop() {
var (
stopCh chan struct{}
prev common.Hash

prevNumber *big.Int
prevProfit *big.Int
)

// interrupt aborts the in-flight sealing task.
Expand All @@ -553,10 +559,18 @@ func (w *worker) taskLoop() {
if sealHash == prev {
continue
}

// reject new tasks which don't profit
if prevNumber != nil && prevProfit != nil &&
task.block.Number().Cmp(prevNumber) == 0 && task.profit.Cmp(prevProfit) < 0 {
continue
}
prevNumber, prevProfit = task.block.Number(), task.profit

// Interrupt previous sealing operation
interrupt()
stopCh, prev = make(chan struct{}), sealHash

log.Info("Proposed miner block", "blockNumber", prevNumber, "profit", prevProfit, "sealhash", sealHash)
if w.skipSealHook != nil && w.skipSealHook(task) {
continue
}
Expand Down Expand Up @@ -640,11 +654,10 @@ func (w *worker) resultLoop() {
}
}

// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
func (w *worker) generateEnv(parent *types.Block, header *types.Header) (*environment, error) {
state, err := w.chain.StateAt(parent.Root())
if err != nil {
return err
return nil, err
}
env := &environment{
signer: types.NewEIP155Signer(w.chainConfig.ChainID),
Expand All @@ -653,6 +666,7 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
family: mapset.NewSet(),
uncles: mapset.NewSet(),
header: header,
profit: new(big.Int),
}

// when 08 is processed ancestors contain 07 (quick block)
Expand All @@ -666,6 +680,16 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {

// Keep track of transactions which return errors so they can be removed
env.tcount = 0
env.gasPool = new(core.GasPool).AddGas(header.GasLimit)
return env, nil
}

// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
env, err := w.generateEnv(parent, header)
if err != nil {
return err
}
w.current = env
return nil
}
Expand Down Expand Up @@ -723,8 +747,9 @@ func (w *worker) updateSnapshot() {
w.snapshotState = w.current.state.Copy()
}

func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address, trackProfit bool) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
initialBalance := w.current.state.GetBalance(w.coinbase)

receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
if err != nil {
Expand All @@ -734,17 +759,128 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
w.current.txs = append(w.current.txs, tx)
w.current.receipts = append(w.current.receipts, receipt)

// coinbase balance difference already contains gas fee
if trackProfit {
finalBalance := w.current.state.GetBalance(w.coinbase)
w.current.profit.Add(w.current.profit, new(big.Int).Sub(finalBalance, initialBalance))
} else {
gasUsed := new(big.Int).SetUint64(receipt.GasUsed)
w.current.profit.Add(w.current.profit, gasUsed.Mul(gasUsed, tx.GasPrice()))
}

return receipt.Logs, nil
}

func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, interrupt *int32) bool {
// Short circuit if current is nil
if w.current == nil {
return true
}

if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
var coalescedLogs []*types.Log

for _, tx := range txs {
// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
break
}
if tx == nil {
log.Error("Unexpected nil transaction in bundle")
return true
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(w.current.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) {
log.Debug("Unexpected protected transaction in bundle")
return true
}
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)

logs, err := w.commitTransaction(tx, coinbase, true)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
log.Error("Unexpected gas limit exceeded for current block in the bundle", "sender", from)
return true

case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Error("Transaction with low nonce in the bundle", "sender", from, "nonce", tx.Nonce())
return true

case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Error("Account with high nonce in the bundle", "sender", from, "nonce", tx.Nonce())
return true

case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
w.current.tcount++
continue

default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Error("Transaction failed in the bundle", "hash", tx.Hash(), "err", err)
return true
}
}

if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are mining. The reason is that
// when we are mining, the worker will regenerate a mining block every 3 seconds.
// In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.

// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
// logs by filling in the block hash when the block was mined by the local miner. This can
// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
cpy := make([]*types.Log, len(coalescedLogs))
for i, l := range coalescedLogs {
cpy[i] = new(types.Log)
*cpy[i] = *l
}
w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false
}

func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
// Short circuit if current is nil
if w.current == nil {
return true
}

var coalescedLogs []*types.Log
Expand Down Expand Up @@ -796,7 +932,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)

logs, err := w.commitTransaction(tx, coinbase)
logs, err := w.commitTransaction(tx, coinbase, false)
switch {
case errors.Is(err, core.ErrGasLimitReached):
// Pop the current out-of-gas transaction without shifting in the next from the account
Expand Down Expand Up @@ -854,7 +990,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
w.mu.RLock()
defer w.mu.RUnlock()

tstart := time.Now()
parent := w.chain.CurrentBlock()

Expand Down Expand Up @@ -949,10 +1084,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
log.Error("Failed to fetch pending transactions", "err", err)
return
}
// Short circuit if there is no available pending transactions.
// Short circuit if there is no available pending transactions or bundles.
// But if we disable empty precommit already, ignore it. Since
// empty block is necessary to keep the liveness of the network.
if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 {
if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && len(w.eth.TxPool().AllMevBundles()) == 0 {
w.updateSnapshot()
return
}
Expand All @@ -964,6 +1099,16 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
localTxs[account] = txs
}
}
bundles, err := w.eth.TxPool().MevBundles(header.Number, header.Time)
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
maxBundle, bundlePrice, ethToCoinbase, gasUsed := w.findMostProfitableBundle(bundles, w.coinbase, parent, header)
log.Info("Flashbots bundle", "ethToCoinbase", ethToCoinbase, "gasUsed", gasUsed, "bundlePrice", bundlePrice, "bundleLength", len(maxBundle))
if w.commitBundle(maxBundle, w.coinbase, interrupt) {
return
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
if w.commitTransactions(txs, w.coinbase, interrupt) {
Expand Down Expand Up @@ -994,7 +1139,7 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
interval()
}
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now(), profit: w.current.profit}:
w.unconfirmed.Shift(block.NumberU64() - 1)
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount,
Expand All @@ -1011,6 +1156,62 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
return nil
}

func (w *worker) findMostProfitableBundle(bundles []types.Transactions, coinbase common.Address, parent *types.Block, header *types.Header) (types.Transactions, *big.Int, *big.Int, uint64) {
maxBundlePrice := new(big.Int)
maxTotalEth := new(big.Int)
var maxTotalGasUsed uint64
maxBundle := types.Transactions{}
for _, bundle := range bundles {
if len(bundle) == 0 {
continue
}
totalEth, totalGasUsed, err := w.computeBundleGas(bundle, parent, header)

if err != nil {
log.Warn("Error computing gas for a bundle", "error", err)
continue
}

mevGasPrice := new(big.Int).Div(totalEth, new(big.Int).SetUint64(totalGasUsed))
if mevGasPrice.Cmp(maxBundlePrice) > 0 {
maxBundle = bundle
maxBundlePrice = mevGasPrice
maxTotalEth = totalEth
maxTotalGasUsed = totalGasUsed
}
}

return maxBundle, maxBundlePrice, maxTotalEth, maxTotalGasUsed
}

// Compute the adjusted gas price for a whole bundle
// Done by calculating all gas spent, adding transfers to the coinbase, and then dividing by gas used
func (w *worker) computeBundleGas(bundle types.Transactions, parent *types.Block, header *types.Header) (*big.Int, uint64, error) {
env, err := w.generateEnv(parent, header)
if err != nil {
return nil, 0, err
}

var totalGasUsed uint64 = 0
var tempGasUsed uint64

coinbaseBalanceBefore := env.state.GetBalance(w.coinbase)

for _, tx := range bundle {
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &w.coinbase, env.gasPool, env.state, env.header, tx, &tempGasUsed, *w.chain.GetVMConfig())
if err != nil {
return nil, 0, err
}
totalGasUsed += receipt.GasUsed
}
coinbaseBalanceAfter := env.state.GetBalance(w.coinbase)
coinbaseDiff := new(big.Int).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore)
totalEth := new(big.Int)
totalEth.Add(totalEth, coinbaseDiff)

return totalEth, totalGasUsed, nil
}

// copyReceipts makes a deep copy of the given receipts.
func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
result := make([]*types.Receipt, len(receipts))
Expand Down

0 comments on commit c2b5b40

Please sign in to comment.