From f696fac90984fb26af9e0ecc99cf8b6dcd8622f9 Mon Sep 17 00:00:00 2001 From: xgreenx Date: Mon, 4 Mar 2024 18:34:56 +0100 Subject: [PATCH 1/2] Notify about imported blocks from the off-chain worker --- crates/fuel-core/src/graphql_api/ports.rs | 10 + .../src/graphql_api/worker_service.rs | 287 ++++++++++-------- .../src/service/adapters/graphql_api.rs | 21 +- .../src/service/genesis/off_chain.rs | 4 +- crates/fuel-core/src/service/sub_services.rs | 1 + crates/services/txpool/src/service.rs | 22 +- crates/services/txpool/src/txpool.rs | 16 +- 7 files changed, 202 insertions(+), 159 deletions(-) diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index 720431c396..d90c90de24 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -260,4 +260,14 @@ pub mod worker { /// Returns a stream of imported block. fn block_events(&self) -> BoxStream; } + + pub trait TxPool: Send + Sync { + /// Sends the complete status of the transaction. + fn send_complete( + &self, + id: Bytes32, + block_height: &BlockHeight, + status: TransactionStatus, + ); + } } diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index 0fcd0dc627..734c48f38f 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -78,23 +78,25 @@ use std::{ /// The off-chain GraphQL API worker task processes the imported blocks /// and actualize the information used by the GraphQL service. -pub struct Task { +pub struct Task { + tx_pool: TxPool, block_importer: BoxStream, database: D, } -impl Task +impl Task where + TxPool: ports::worker::TxPool, D: ports::worker::OffChainDatabase, { fn process_block(&mut self, result: SharedImportResult) -> anyhow::Result<()> { let block = &result.sealed_block.entity; let mut transaction = self.database.transaction(); // save the status for every transaction using the finalized block id - self.persist_transaction_status(&result, transaction.as_mut())?; + persist_transaction_status(&result, transaction.as_mut())?; // save the associated owner for each transaction in the block - self.index_tx_owners_for_block(block, transaction.as_mut())?; + index_tx_owners_for_block(block, transaction.as_mut())?; let height = block.header().height(); let block_id = block.id(); @@ -108,7 +110,7 @@ where .increase_tx_count(block.transactions().len() as u64) .unwrap_or_default(); - Self::process_executor_events( + process_executor_events( result.events.iter().map(Cow::Borrowed), transaction.as_mut(), )?; @@ -129,161 +131,173 @@ where transaction.commit()?; + for status in result.tx_status.iter() { + let tx_id = status.id; + let status = from_executor_to_status(block, status.result.clone()); + self.tx_pool.send_complete(tx_id, height, status); + } + // update the importer metrics after the block is successfully committed graphql_metrics().total_txs_count.set(total_tx_count as i64); Ok(()) } +} - /// Process the executor events and update the indexes for the messages and coins. - pub fn process_executor_events<'a, Iter>( - events: Iter, - block_st_transaction: &mut D, - ) -> anyhow::Result<()> - where - Iter: Iterator>, - { - for event in events { - match event.deref() { - Event::MessageImported(message) => { - block_st_transaction - .storage_as_mut::() - .insert( - &OwnedMessageKey::new(message.recipient(), message.nonce()), - &(), - )?; - } - Event::MessageConsumed(message) => { - block_st_transaction - .storage_as_mut::() - .remove(&OwnedMessageKey::new( - message.recipient(), - message.nonce(), - ))?; - } - Event::CoinCreated(coin) => { - let coin_by_owner = owner_coin_id_key(&coin.owner, &coin.utxo_id); - block_st_transaction - .storage_as_mut::() - .insert(&coin_by_owner, &())?; - } - Event::CoinConsumed(coin) => { - let key = owner_coin_id_key(&coin.owner, &coin.utxo_id); - block_st_transaction - .storage_as_mut::() - .remove(&key)?; - } +/// Process the executor events and update the indexes for the messages and coins. +pub fn process_executor_events<'a, D, Iter>( + events: Iter, + block_st_transaction: &mut D, +) -> anyhow::Result<()> +where + D: ports::worker::OffChainDatabase, + Iter: Iterator>, +{ + for event in events { + match event.deref() { + Event::MessageImported(message) => { + block_st_transaction + .storage_as_mut::() + .insert( + &OwnedMessageKey::new(message.recipient(), message.nonce()), + &(), + )?; + } + Event::MessageConsumed(message) => { + block_st_transaction + .storage_as_mut::() + .remove(&OwnedMessageKey::new( + message.recipient(), + message.nonce(), + ))?; + } + Event::CoinCreated(coin) => { + let coin_by_owner = owner_coin_id_key(&coin.owner, &coin.utxo_id); + block_st_transaction + .storage_as_mut::() + .insert(&coin_by_owner, &())?; + } + Event::CoinConsumed(coin) => { + let key = owner_coin_id_key(&coin.owner, &coin.utxo_id); + block_st_transaction + .storage_as_mut::() + .remove(&key)?; } } - Ok(()) } + Ok(()) +} - /// Associate all transactions within a block to their respective UTXO owners - fn index_tx_owners_for_block( - &self, - block: &Block, - block_st_transaction: &mut D, - ) -> anyhow::Result<()> { - for (tx_idx, tx) in block.transactions().iter().enumerate() { - let block_height = *block.header().height(); - let inputs; - let outputs; - let tx_idx = u16::try_from(tx_idx).map_err(|e| { - anyhow::anyhow!("The block has more than `u16::MAX` transactions, {}", e) - })?; - let tx_id = tx.cached_id().expect( - "The imported block should contains only transactions with cached id", - ); - match tx { - Transaction::Script(tx) => { - inputs = tx.inputs().as_slice(); - outputs = tx.outputs().as_slice(); - } - Transaction::Create(tx) => { - inputs = tx.inputs().as_slice(); - outputs = tx.outputs().as_slice(); - } - Transaction::Mint(_) => continue, +/// Associate all transactions within a block to their respective UTXO owners +fn index_tx_owners_for_block( + block: &Block, + block_st_transaction: &mut D, +) -> anyhow::Result<()> +where + D: ports::worker::OffChainDatabase, +{ + for (tx_idx, tx) in block.transactions().iter().enumerate() { + let block_height = *block.header().height(); + let inputs; + let outputs; + let tx_idx = u16::try_from(tx_idx).map_err(|e| { + anyhow::anyhow!("The block has more than `u16::MAX` transactions, {}", e) + })?; + let tx_id = tx.cached_id().expect( + "The imported block should contains only transactions with cached id", + ); + match tx { + Transaction::Script(tx) => { + inputs = tx.inputs().as_slice(); + outputs = tx.outputs().as_slice(); } - self.persist_owners_index( - block_height, - inputs, - outputs, - &tx_id, - tx_idx, - block_st_transaction, - )?; + Transaction::Create(tx) => { + inputs = tx.inputs().as_slice(); + outputs = tx.outputs().as_slice(); + } + Transaction::Mint(_) => continue, } - Ok(()) + persist_owners_index( + block_height, + inputs, + outputs, + &tx_id, + tx_idx, + block_st_transaction, + )?; } + Ok(()) +} - /// Index the tx id by owner for all of the inputs and outputs - fn persist_owners_index( - &self, - block_height: BlockHeight, - inputs: &[Input], - outputs: &[Output], - tx_id: &Bytes32, - tx_idx: u16, - db: &mut D, - ) -> StorageResult<()> { - let mut owners = vec![]; - for input in inputs { - if let Input::CoinSigned(CoinSigned { owner, .. }) - | Input::CoinPredicate(CoinPredicate { owner, .. }) = input - { - owners.push(owner); - } +/// Index the tx id by owner for all of the inputs and outputs +fn persist_owners_index( + block_height: BlockHeight, + inputs: &[Input], + outputs: &[Output], + tx_id: &Bytes32, + tx_idx: u16, + db: &mut D, +) -> StorageResult<()> +where + D: ports::worker::OffChainDatabase, +{ + let mut owners = vec![]; + for input in inputs { + if let Input::CoinSigned(CoinSigned { owner, .. }) + | Input::CoinPredicate(CoinPredicate { owner, .. }) = input + { + owners.push(owner); } + } - for output in outputs { - match output { - Output::Coin { to, .. } - | Output::Change { to, .. } - | Output::Variable { to, .. } => { - owners.push(to); - } - Output::Contract(_) | Output::ContractCreated { .. } => {} + for output in outputs { + match output { + Output::Coin { to, .. } + | Output::Change { to, .. } + | Output::Variable { to, .. } => { + owners.push(to); } + Output::Contract(_) | Output::ContractCreated { .. } => {} } + } - // dedupe owners from inputs and outputs prior to indexing - owners.sort(); - owners.dedup(); + // dedupe owners from inputs and outputs prior to indexing + owners.sort(); + owners.dedup(); - for owner in owners { - db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?; - } - - Ok(()) + for owner in owners { + db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?; } - fn persist_transaction_status( - &self, - import_result: &ImportResult, - db: &mut D, - ) -> StorageResult<()> { - for TransactionExecutionStatus { id, result } in import_result.tx_status.iter() { - let status = from_executor_to_status( - &import_result.sealed_block.entity, - result.clone(), - ); + Ok(()) +} - if db.update_tx_status(id, status)?.is_some() { - return Err(anyhow::anyhow!( - "Transaction status already exists for tx {}", - id - ) - .into()); - } +fn persist_transaction_status( + import_result: &ImportResult, + db: &mut D, +) -> StorageResult<()> +where + D: ports::worker::OffChainDatabase, +{ + for TransactionExecutionStatus { id, result } in import_result.tx_status.iter() { + let status = + from_executor_to_status(&import_result.sealed_block.entity, result.clone()); + + if db.update_tx_status(id, status)?.is_some() { + return Err(anyhow::anyhow!( + "Transaction status already exists for tx {}", + id + ) + .into()); } - Ok(()) } + Ok(()) } #[async_trait::async_trait] -impl RunnableService for Task +impl RunnableService for Task where + TxPool: ports::worker::TxPool, D: ports::worker::OffChainDatabase, { const NAME: &'static str = "GraphQL_Off_Chain_Worker"; @@ -315,8 +329,9 @@ where } #[async_trait::async_trait] -impl RunnableTask for Task +impl RunnableTask for Task where + TxPool: ports::worker::TxPool, D: ports::worker::OffChainDatabase, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { @@ -356,13 +371,19 @@ where } } -pub fn new_service(block_importer: I, database: D) -> ServiceRunner> +pub fn new_service( + tx_pool: TxPool, + block_importer: I, + database: D, +) -> ServiceRunner> where + TxPool: ports::worker::TxPool, I: ports::worker::BlockImporter, D: ports::worker::OffChainDatabase, { let block_importer = block_importer.block_events(); ServiceRunner::new(Task { + tx_pool, block_importer, database, }) diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index 1e41083f40..b3ef02fc74 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -25,13 +25,19 @@ use fuel_core_txpool::{ }; use fuel_core_types::{ entities::message::MerkleProof, - fuel_tx::Transaction, + fuel_tx::{ + Bytes32, + Transaction, + }, fuel_types::BlockHeight, services::{ block_importer::SharedImportResult, executor::TransactionExecutionStatus, p2p::PeerInfo, - txpool::InsertionResult, + txpool::{ + InsertionResult, + TransactionStatus, + }, }, tai64::Tai64, }; @@ -144,3 +150,14 @@ impl worker::BlockImporter for BlockImporterAdapter { self.events() } } + +impl worker::TxPool for TxPoolAdapter { + fn send_complete( + &self, + id: Bytes32, + block_height: &BlockHeight, + status: TransactionStatus, + ) { + self.service.send_complete(id, block_height, status) + } +} diff --git a/crates/fuel-core/src/service/genesis/off_chain.rs b/crates/fuel-core/src/service/genesis/off_chain.rs index 90d283d5f8..f96b324799 100644 --- a/crates/fuel-core/src/service/genesis/off_chain.rs +++ b/crates/fuel-core/src/service/genesis/off_chain.rs @@ -34,7 +34,7 @@ pub fn execute_genesis_block( Cow::Owned(Event::MessageImported(message)) }); - worker_service::Task::process_executor_events( + worker_service::process_executor_events( messages_events, database_transaction.as_mut(), )?; @@ -47,7 +47,7 @@ pub fn execute_genesis_block( Cow::Owned(Event::CoinCreated(coin)) }); - worker_service::Task::process_executor_events( + worker_service::process_executor_events( coin_events, database_transaction.as_mut(), )?; diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 8adfafef88..b132209a7e 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -193,6 +193,7 @@ pub fn init_sub_services( .data(database.on_chain().clone()); let graphql_worker = fuel_core_graphql_api::worker_service::new_service( + tx_pool_adapter.clone(), importer_adapter.clone(), database.off_chain().clone(), ); diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index 7c6b76e130..9758cfaee1 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -94,10 +94,10 @@ impl TxStatusChange { &self, id: Bytes32, block_height: &BlockHeight, - message: impl Into, + message: TxStatusMessage, ) { tracing::info!("Transaction {id} successfully included in block {block_height}"); - self.update_sender.send(TxUpdate::new(id, message.into())); + self.update_sender.send(TxUpdate::new(id, message)); } pub fn send_submitted(&self, id: Bytes32, time: Tai64) { @@ -208,14 +208,9 @@ where .sealed_block .entity.header().height(); - let block = &result - .sealed_block - .entity; { let mut lock = self.shared.txpool.lock(); lock.block_update( - &self.shared.tx_status_sender, - block, &result.tx_status, ); *self.shared.current_height.lock() = new_height; @@ -344,6 +339,19 @@ impl SharedState { .try_subscribe::(tx_id) .ok_or(anyhow!("Maximum number of subscriptions reached")) } + + pub fn send_complete( + &self, + id: Bytes32, + block_height: &BlockHeight, + status: TransactionStatus, + ) { + self.tx_status_sender.send_complete( + id, + block_height, + TxStatusMessage::Status(status), + ) + } } impl SharedState diff --git a/crates/services/txpool/src/txpool.rs b/crates/services/txpool/src/txpool.rs index 1cbfa46d8b..6929bf8ad0 100644 --- a/crates/services/txpool/src/txpool.rs +++ b/crates/services/txpool/src/txpool.rs @@ -32,16 +32,11 @@ use fuel_core_types::{ tai64::Tai64, }; -use crate::service::TxStatusMessage; use fuel_core_metrics::txpool_metrics::txpool_metrics; use fuel_core_storage::transactional::AtomicView; use fuel_core_types::{ - blockchain::block::Block, fuel_vm::checked_transaction::CheckPredicateParams, - services::{ - executor::TransactionExecutionStatus, - txpool::from_executor_to_status, - }, + services::executor::TransactionExecutionStatus, }; use std::{ cmp::Reverse, @@ -184,20 +179,11 @@ impl TxPool { /// When block is updated we need to receive all spend outputs and remove them from txpool. pub fn block_update( &mut self, - tx_status_sender: &TxStatusChange, - block: &Block, tx_status: &[TransactionExecutionStatus], // spend_outputs: [Input], added_outputs: [AddedOutputs] ) { - let height = block.header().height(); for status in tx_status { let tx_id = status.id; - let status = from_executor_to_status(block, status.result.clone()); - tx_status_sender.send_complete( - tx_id, - height, - TxStatusMessage::Status(status), - ); self.remove_committed_tx(&tx_id); } } From a34013a5d9bbbb38691246775ddd912ebf177b46 Mon Sep 17 00:00:00 2001 From: xgreenx Date: Mon, 4 Mar 2024 18:36:02 +0100 Subject: [PATCH 2/2] Updated CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1741353d77..27a4b59e62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed +- [#1723](https://github.com/FuelLabs/fuel-core/pull/1723): Notify about imported blocks from the off-chain worker. - [#1717](https://github.com/FuelLabs/fuel-core/pull/1717): The fix for the [#1657](https://github.com/FuelLabs/fuel-core/pull/1657) to include the contract into `ContractsInfo` table. - [#1657](https://github.com/FuelLabs/fuel-core/pull/1657): Upgrade to `fuel-vm` 0.46.0. - [#1671](https://github.com/FuelLabs/fuel-core/pull/1671): The logic related to the `FuelBlockIdsToHeights` is moved to the off-chain worker.