Skip to content

Commit

Permalink
feat: work on transaction pool
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Mar 26, 2022
1 parent 58a6b5a commit 243c504
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 63 deletions.
34 changes: 16 additions & 18 deletions node/src/eth/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use ethers::{
},
};
use forge_node_core::{
error::Error,
eth::EthRequest,
request::RequestParams,
response::RpcResponse,
types::{Index, Work},
};
Expand All @@ -31,7 +29,7 @@ pub struct EthApi {

impl EthApi {
/// Executes the [EthRequest] and returns an RPC [RpcResponse]
pub async fn execute(&self, request: EthRequest) -> RpcResponse {
pub async fn execute(&self, _request: EthRequest) -> RpcResponse {
todo!()
}

Expand Down Expand Up @@ -96,7 +94,7 @@ impl EthApi {
/// Returns balance of the given account.
///
/// Handler for ETH RPC call: `eth_getBalance`
pub async fn balance(&self, address: H160, number: Option<BlockNumber>) -> Result<U256> {
pub async fn balance(&self, _address: H160, _number: Option<BlockNumber>) -> Result<U256> {
todo!()
}

Expand All @@ -105,17 +103,17 @@ impl EthApi {
/// Handler for ETH RPC call: `eth_getStorageAt`
pub async fn storage_at(
&self,
address: H160,
index: U256,
number: Option<BlockNumber>,
_address: H160,
_index: U256,
_number: Option<BlockNumber>,
) -> Result<H256> {
todo!()
}

/// Returns block with given hash.
///
/// Handler for ETH RPC call: `eth_getBlockByHash`
pub async fn block_by_hash(&self, hash: H256, full: bool) -> Result<Option<Block<TxHash>>> {
pub async fn block_by_hash(&self, _hash: H256, _full: bool) -> Result<Option<Block<TxHash>>> {
todo!()
}

Expand All @@ -129,7 +127,7 @@ impl EthApi {
/// Returns the number of transactions sent from given address at given time (block number).
///
/// Handler for ETH RPC call: `eth_getTransactionCount`
pub async fn transaction_count(&self, address: H160, _: Option<BlockNumber>) -> Result<U256> {
pub async fn transaction_count(&self, _address: H160, _: Option<BlockNumber>) -> Result<U256> {
todo!()
}

Expand Down Expand Up @@ -164,7 +162,7 @@ impl EthApi {
/// Returns the code at given address at given time (block number).
///
/// Handler for ETH RPC call: `eth_getCode`
pub async fn code_at(&self, address: H160, _: Option<BlockNumber>) -> Result<Bytes> {
pub async fn code_at(&self, _address: H160, _: Option<BlockNumber>) -> Result<Bytes> {
todo!()
}

Expand All @@ -188,8 +186,8 @@ impl EthApi {
/// Handler for ETH RPC call: `eth_call`
pub async fn call(
&self,
request: TypedTransaction,
number: Option<BlockNumber>,
_request: TypedTransaction,
_number: Option<BlockNumber>,
) -> Result<Bytes> {
todo!()
}
Expand All @@ -199,8 +197,8 @@ impl EthApi {
/// Handler for ETH RPC call: `eth_estimateGas`
pub async fn estimate_gas(
&self,
request: TypedTransaction,
number: Option<BlockNumber>,
_request: TypedTransaction,
_number: Option<BlockNumber>,
) -> Result<U256> {
todo!()
}
Expand Down Expand Up @@ -237,7 +235,7 @@ impl EthApi {
/// Returns transaction receipt by transaction hash.
///
/// Handler for ETH RPC call: `eth_getTransactionReceipt`
pub async fn transaction_receipt(&self, hash: H256) -> Result<Option<TransactionReceipt>> {
pub async fn transaction_receipt(&self, _hash: H256) -> Result<Option<TransactionReceipt>> {
todo!()
}

Expand Down Expand Up @@ -285,9 +283,9 @@ impl EthApi {
/// Handler for ETH RPC call: `eth_feeHistory`
pub async fn fee_history(
&self,
block_count: U256,
newest_block: BlockNumber,
reward_percentiles: Option<Vec<f64>>,
_block_count: U256,
_newest_block: BlockNumber,
_reward_percentiles: Option<Vec<f64>>,
) -> Result<FeeHistory> {
todo!()
}
Expand Down
8 changes: 7 additions & 1 deletion node/src/eth/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Aggregated error type for this module

use crate::eth::pool::transactions::PoolTransaction;

pub(crate) type Result<T> = std::result::Result<T, BlockchainError>;

#[derive(thiserror::Error, Debug)]
Expand All @@ -11,6 +13,10 @@ pub enum BlockchainError {
/// Errors that can occur in the transaction pool
#[derive(thiserror::Error, Debug)]
pub enum PoolError {
#[error("Invalid transaction validity")]
#[error("Transaction with cyclic dependent transactions")]
CyclicTransaction,
#[error("Invalid transaction")]
InvalidTransaction(),
#[error("Tx: [{0:?}] already imported")]
AlreadyImported(Box<PoolTransaction>),
}
2 changes: 2 additions & 0 deletions node/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ pub mod pool;
pub mod executor;

pub mod miner;

pub mod util;
125 changes: 110 additions & 15 deletions node/src/eth/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use crate::eth::{
error::{BlockchainError, PoolError},
pool::transactions::PendingTransactions,
pool::transactions::{
PendingTransaction, PendingTransactions, PoolTransaction, ReadyTransactions,
},
};
use ethers::types::{Transaction, TxHash};
use foundry_evm::revm;
use futures::channel::mpsc::{channel, Sender};
use parking_lot::{Mutex, RwLock};
use std::collections::VecDeque;

use parking_lot::RwLock;
use std::sync::Arc;
use tracing::{debug, trace};

mod transactions;
pub mod transactions;

/// Transaction pool that performs validation.
pub struct Pool {
Expand All @@ -19,28 +22,120 @@ pub struct Pool {
/// A Transaction Pool
///
/// Contains all transactions that are ready to be executed
#[derive(Debug, Default)]
struct PoolInner {
ready_transactions: ReadyTransactions,
pending_transactions: PendingTransactions,
// TODO needs access to DB to fetch current nonce
}

// == impl PoolInner ==

impl PoolInner {
/// Returns true if this pool already contains the transaction
pub fn contains(&self, tx_hash: &TxHash) -> bool {
self.pending_transactions.contains(tx_hash) || self.ready_transactions.contains(tx_hash)
}

/// Adds a new transaction to the pool
pub fn submit(&mut self, tx: Transaction) -> Result<SubmittedTransaction, PoolError> {
todo!()
pub fn add_transaction(&mut self, tx: PoolTransaction) -> Result<AddedTransaction, PoolError> {
if self.contains(tx.hash()) {
return Err(PoolError::AlreadyImported(Box::new(tx)))
}
let tx = PendingTransaction::new(tx, self.ready_transactions.provided_markers());
trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx);

// If all markers are not satisfied import to future
if !tx.is_ready() {
let hash = *tx.transaction.hash();
self.pending_transactions.add_transaction(tx);
return Ok(AddedTransaction::Pending { hash })
}
self.add_ready_transaction(tx)
}

/// Adds transaction to ready queue
fn add_ready_transaction(
&mut self,
tx: PendingTransaction,
) -> Result<AddedTransaction, PoolError> {
let hash = *tx.transaction.hash();
let mut ready = ReadyTransaction::new(hash);

let mut tx_queue = VecDeque::from([tx]);
// tracks whether we're processing the given `tx`
let mut is_new_tx = true;

// take first transaction from the list
while let Some(current_tx) = tx_queue.pop_front() {
// also add the transaction that the current transaction unlocks
tx_queue.extend(
self.pending_transactions.mark_and_unlock(&current_tx.transaction.provides),
);

let current_hash = *current_tx.transaction.hash();
// try to add the transaction to the ready pool
match self.ready_transactions.add_transaction(current_tx) {
Ok(mut replaced_transactions) => {
if !is_new_tx {
ready.promoted.push(current_hash);
}
// tx removed from ready pool
ready.removed.extend(replaced_transactions);
}
Err(err) => {
// failed to add transaction
if is_new_tx {
debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
err);
return Err(err)
} else {
ready.discarded.push(current_hash);
}
}
}
is_new_tx = false;
}

// check for a cycle where importing a transaction resulted in pending transactions to be
// added while removing current transaction. in which case we move this transaction back to
// the pending queue
if ready.removed.iter().any(|tx| *tx.hash() == hash) {
self.ready_transactions.clear_transactions(&ready.promoted);
return Err(PoolError::CyclicTransaction)
}

Ok(AddedTransaction::Ready(ready))
}
}

#[derive(Debug, Clone)]
pub enum SubmittedTransaction {
/// Transaction successfully submitted and being processed
Ready {
/// the hash of the submitted transaction
hash: TxHash,
},
/// Transaction successfully submitted but not yet queued for processing
pub struct ReadyTransaction {
/// the hash of the submitted transaction
hash: TxHash,
/// transactions promoted to the ready queue
promoted: Vec<TxHash>,
/// transaction that failed and became discarded
discarded: Vec<TxHash>,
/// Transactions removed from the Ready pool
removed: Vec<Arc<PoolTransaction>>,
}

impl ReadyTransaction {
pub fn new(hash: TxHash) -> Self {
Self {
hash,
promoted: Default::default(),
discarded: Default::default(),
removed: Default::default(),
}
}
}

#[derive(Debug, Clone)]
pub enum AddedTransaction {
/// transaction was successfully added and being processed
Ready(ReadyTransaction),
/// Transaction was successfully added but not yet queued for processing
Pending {
/// the hash of the submitted transaction
hash: TxHash,
Expand Down
Loading

0 comments on commit 243c504

Please sign in to comment.