Skip to content

Commit

Permalink
Notify about imported blocks from the off-chain worker (#1723)
Browse files Browse the repository at this point in the history
Closes #1659
  • Loading branch information
xgreenx committed Mar 4, 2024
1 parent e126931 commit 799d800
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 159 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed

- [#1723](https://github.com/FuelLabs/fuel-core/pull/1723): Notify about imported blocks from the off-chain worker.
- [#1717](https://github.com/FuelLabs/fuel-core/pull/1717): The fix for the [#1657](https://github.com/FuelLabs/fuel-core/pull/1657) to include the contract into `ContractsInfo` table.
- [#1657](https://github.com/FuelLabs/fuel-core/pull/1657): Upgrade to `fuel-vm` 0.46.0.
- [#1671](https://github.com/FuelLabs/fuel-core/pull/1671): The logic related to the `FuelBlockIdsToHeights` is moved to the off-chain worker.
Expand Down
10 changes: 10 additions & 0 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,14 @@ pub mod worker {
/// Returns a stream of imported block.
fn block_events(&self) -> BoxStream<SharedImportResult>;
}

pub trait TxPool: Send + Sync {
/// Sends the complete status of the transaction.
fn send_complete(
&self,
id: Bytes32,
block_height: &BlockHeight,
status: TransactionStatus,
);
}
}
287 changes: 154 additions & 133 deletions crates/fuel-core/src/graphql_api/worker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,25 @@ use std::{

/// The off-chain GraphQL API worker task processes the imported blocks
/// and actualize the information used by the GraphQL service.
pub struct Task<D> {
pub struct Task<TxPool, D> {
tx_pool: TxPool,
block_importer: BoxStream<SharedImportResult>,
database: D,
}

impl<D> Task<D>
impl<TxPool, D> Task<TxPool, D>
where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
fn process_block(&mut self, result: SharedImportResult) -> anyhow::Result<()> {
let block = &result.sealed_block.entity;
let mut transaction = self.database.transaction();
// save the status for every transaction using the finalized block id
self.persist_transaction_status(&result, transaction.as_mut())?;
persist_transaction_status(&result, transaction.as_mut())?;

// save the associated owner for each transaction in the block
self.index_tx_owners_for_block(block, transaction.as_mut())?;
index_tx_owners_for_block(block, transaction.as_mut())?;

let height = block.header().height();
let block_id = block.id();
Expand All @@ -108,7 +110,7 @@ where
.increase_tx_count(block.transactions().len() as u64)
.unwrap_or_default();

Self::process_executor_events(
process_executor_events(
result.events.iter().map(Cow::Borrowed),
transaction.as_mut(),
)?;
Expand All @@ -129,161 +131,173 @@ where

transaction.commit()?;

for status in result.tx_status.iter() {
let tx_id = status.id;
let status = from_executor_to_status(block, status.result.clone());
self.tx_pool.send_complete(tx_id, height, status);
}

// update the importer metrics after the block is successfully committed
graphql_metrics().total_txs_count.set(total_tx_count as i64);

Ok(())
}
}

/// Process the executor events and update the indexes for the messages and coins.
pub fn process_executor_events<'a, Iter>(
events: Iter,
block_st_transaction: &mut D,
) -> anyhow::Result<()>
where
Iter: Iterator<Item = Cow<'a, Event>>,
{
for event in events {
match event.deref() {
Event::MessageImported(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.insert(
&OwnedMessageKey::new(message.recipient(), message.nonce()),
&(),
)?;
}
Event::MessageConsumed(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.remove(&OwnedMessageKey::new(
message.recipient(),
message.nonce(),
))?;
}
Event::CoinCreated(coin) => {
let coin_by_owner = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.insert(&coin_by_owner, &())?;
}
Event::CoinConsumed(coin) => {
let key = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.remove(&key)?;
}
/// Process the executor events and update the indexes for the messages and coins.
pub fn process_executor_events<'a, D, Iter>(
events: Iter,
block_st_transaction: &mut D,
) -> anyhow::Result<()>
where
D: ports::worker::OffChainDatabase,
Iter: Iterator<Item = Cow<'a, Event>>,
{
for event in events {
match event.deref() {
Event::MessageImported(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.insert(
&OwnedMessageKey::new(message.recipient(), message.nonce()),
&(),
)?;
}
Event::MessageConsumed(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.remove(&OwnedMessageKey::new(
message.recipient(),
message.nonce(),
))?;
}
Event::CoinCreated(coin) => {
let coin_by_owner = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.insert(&coin_by_owner, &())?;
}
Event::CoinConsumed(coin) => {
let key = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.remove(&key)?;
}
}
Ok(())
}
Ok(())
}

/// Associate all transactions within a block to their respective UTXO owners
fn index_tx_owners_for_block(
&self,
block: &Block,
block_st_transaction: &mut D,
) -> anyhow::Result<()> {
for (tx_idx, tx) in block.transactions().iter().enumerate() {
let block_height = *block.header().height();
let inputs;
let outputs;
let tx_idx = u16::try_from(tx_idx).map_err(|e| {
anyhow::anyhow!("The block has more than `u16::MAX` transactions, {}", e)
})?;
let tx_id = tx.cached_id().expect(
"The imported block should contains only transactions with cached id",
);
match tx {
Transaction::Script(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
Transaction::Create(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
Transaction::Mint(_) => continue,
/// Associate all transactions within a block to their respective UTXO owners
fn index_tx_owners_for_block<D>(
block: &Block,
block_st_transaction: &mut D,
) -> anyhow::Result<()>
where
D: ports::worker::OffChainDatabase,
{
for (tx_idx, tx) in block.transactions().iter().enumerate() {
let block_height = *block.header().height();
let inputs;
let outputs;
let tx_idx = u16::try_from(tx_idx).map_err(|e| {
anyhow::anyhow!("The block has more than `u16::MAX` transactions, {}", e)
})?;
let tx_id = tx.cached_id().expect(
"The imported block should contains only transactions with cached id",
);
match tx {
Transaction::Script(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
self.persist_owners_index(
block_height,
inputs,
outputs,
&tx_id,
tx_idx,
block_st_transaction,
)?;
Transaction::Create(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
Transaction::Mint(_) => continue,
}
Ok(())
persist_owners_index(
block_height,
inputs,
outputs,
&tx_id,
tx_idx,
block_st_transaction,
)?;
}
Ok(())
}

/// Index the tx id by owner for all of the inputs and outputs
fn persist_owners_index(
&self,
block_height: BlockHeight,
inputs: &[Input],
outputs: &[Output],
tx_id: &Bytes32,
tx_idx: u16,
db: &mut D,
) -> StorageResult<()> {
let mut owners = vec![];
for input in inputs {
if let Input::CoinSigned(CoinSigned { owner, .. })
| Input::CoinPredicate(CoinPredicate { owner, .. }) = input
{
owners.push(owner);
}
/// Index the tx id by owner for all of the inputs and outputs
fn persist_owners_index<D>(
block_height: BlockHeight,
inputs: &[Input],
outputs: &[Output],
tx_id: &Bytes32,
tx_idx: u16,
db: &mut D,
) -> StorageResult<()>
where
D: ports::worker::OffChainDatabase,
{
let mut owners = vec![];
for input in inputs {
if let Input::CoinSigned(CoinSigned { owner, .. })
| Input::CoinPredicate(CoinPredicate { owner, .. }) = input
{
owners.push(owner);
}
}

for output in outputs {
match output {
Output::Coin { to, .. }
| Output::Change { to, .. }
| Output::Variable { to, .. } => {
owners.push(to);
}
Output::Contract(_) | Output::ContractCreated { .. } => {}
for output in outputs {
match output {
Output::Coin { to, .. }
| Output::Change { to, .. }
| Output::Variable { to, .. } => {
owners.push(to);
}
Output::Contract(_) | Output::ContractCreated { .. } => {}
}
}

// dedupe owners from inputs and outputs prior to indexing
owners.sort();
owners.dedup();
// dedupe owners from inputs and outputs prior to indexing
owners.sort();
owners.dedup();

for owner in owners {
db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?;
}

Ok(())
for owner in owners {
db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?;
}

fn persist_transaction_status(
&self,
import_result: &ImportResult,
db: &mut D,
) -> StorageResult<()> {
for TransactionExecutionStatus { id, result } in import_result.tx_status.iter() {
let status = from_executor_to_status(
&import_result.sealed_block.entity,
result.clone(),
);
Ok(())
}

if db.update_tx_status(id, status)?.is_some() {
return Err(anyhow::anyhow!(
"Transaction status already exists for tx {}",
id
)
.into());
}
fn persist_transaction_status<D>(
import_result: &ImportResult,
db: &mut D,
) -> StorageResult<()>
where
D: ports::worker::OffChainDatabase,
{
for TransactionExecutionStatus { id, result } in import_result.tx_status.iter() {
let status =
from_executor_to_status(&import_result.sealed_block.entity, result.clone());

if db.update_tx_status(id, status)?.is_some() {
return Err(anyhow::anyhow!(
"Transaction status already exists for tx {}",
id
)
.into());
}
Ok(())
}
Ok(())
}

#[async_trait::async_trait]
impl<D> RunnableService for Task<D>
impl<TxPool, D> RunnableService for Task<TxPool, D>
where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
const NAME: &'static str = "GraphQL_Off_Chain_Worker";
Expand Down Expand Up @@ -315,8 +329,9 @@ where
}

#[async_trait::async_trait]
impl<D> RunnableTask for Task<D>
impl<TxPool, D> RunnableTask for Task<TxPool, D>
where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
Expand Down Expand Up @@ -356,13 +371,19 @@ where
}
}

pub fn new_service<I, D>(block_importer: I, database: D) -> ServiceRunner<Task<D>>
pub fn new_service<TxPool, I, D>(
tx_pool: TxPool,
block_importer: I,
database: D,
) -> ServiceRunner<Task<TxPool, D>>
where
TxPool: ports::worker::TxPool,
I: ports::worker::BlockImporter,
D: ports::worker::OffChainDatabase,
{
let block_importer = block_importer.block_events();
ServiceRunner::new(Task {
tx_pool,
block_importer,
database,
})
Expand Down
Loading

0 comments on commit 799d800

Please sign in to comment.