Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify about imported blocks from the off-chain worker #1723

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed

- [#1723](https://github.com/FuelLabs/fuel-core/pull/1723): Notify about imported blocks from the off-chain worker.
- [#1717](https://github.com/FuelLabs/fuel-core/pull/1717): The fix for the [#1657](https://github.com/FuelLabs/fuel-core/pull/1657) to include the contract into `ContractsInfo` table.
- [#1657](https://github.com/FuelLabs/fuel-core/pull/1657): Upgrade to `fuel-vm` 0.46.0.
- [#1671](https://github.com/FuelLabs/fuel-core/pull/1671): The logic related to the `FuelBlockIdsToHeights` is moved to the off-chain worker.
Expand Down
10 changes: 10 additions & 0 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,14 @@ pub mod worker {
/// Returns a stream of imported block.
fn block_events(&self) -> BoxStream<SharedImportResult>;
}

pub trait TxPool: Send + Sync {
/// Sends the complete status of the transaction.
fn send_complete(
&self,
id: Bytes32,
block_height: &BlockHeight,
status: TransactionStatus,
);
}
}
287 changes: 154 additions & 133 deletions crates/fuel-core/src/graphql_api/worker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,25 @@ use std::{

/// The off-chain GraphQL API worker task processes the imported blocks
/// and actualize the information used by the GraphQL service.
pub struct Task<D> {
pub struct Task<TxPool, D> {
tx_pool: TxPool,
block_importer: BoxStream<SharedImportResult>,
database: D,
}

impl<D> Task<D>
impl<TxPool, D> Task<TxPool, D>
where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
fn process_block(&mut self, result: SharedImportResult) -> anyhow::Result<()> {
let block = &result.sealed_block.entity;
let mut transaction = self.database.transaction();
// save the status for every transaction using the finalized block id
self.persist_transaction_status(&result, transaction.as_mut())?;
persist_transaction_status(&result, transaction.as_mut())?;

// save the associated owner for each transaction in the block
self.index_tx_owners_for_block(block, transaction.as_mut())?;
index_tx_owners_for_block(block, transaction.as_mut())?;

let height = block.header().height();
let block_id = block.id();
Expand All @@ -108,7 +110,7 @@ where
.increase_tx_count(block.transactions().len() as u64)
.unwrap_or_default();

Self::process_executor_events(
process_executor_events(
result.events.iter().map(Cow::Borrowed),
transaction.as_mut(),
)?;
Expand All @@ -129,161 +131,173 @@ where

transaction.commit()?;

for status in result.tx_status.iter() {
let tx_id = status.id;
let status = from_executor_to_status(block, status.result.clone());
self.tx_pool.send_complete(tx_id, height, status);
}

// update the importer metrics after the block is successfully committed
graphql_metrics().total_txs_count.set(total_tx_count as i64);

Ok(())
}
}

/// Process the executor events and update the indexes for the messages and coins.
pub fn process_executor_events<'a, Iter>(
events: Iter,
block_st_transaction: &mut D,
) -> anyhow::Result<()>
where
Iter: Iterator<Item = Cow<'a, Event>>,
{
for event in events {
match event.deref() {
Event::MessageImported(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.insert(
&OwnedMessageKey::new(message.recipient(), message.nonce()),
&(),
)?;
}
Event::MessageConsumed(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.remove(&OwnedMessageKey::new(
message.recipient(),
message.nonce(),
))?;
}
Event::CoinCreated(coin) => {
let coin_by_owner = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.insert(&coin_by_owner, &())?;
}
Event::CoinConsumed(coin) => {
let key = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.remove(&key)?;
}
/// Process the executor events and update the indexes for the messages and coins.
pub fn process_executor_events<'a, D, Iter>(
events: Iter,
block_st_transaction: &mut D,
) -> anyhow::Result<()>
where
D: ports::worker::OffChainDatabase,
Iter: Iterator<Item = Cow<'a, Event>>,
{
for event in events {
match event.deref() {
Event::MessageImported(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.insert(
&OwnedMessageKey::new(message.recipient(), message.nonce()),
&(),
)?;
}
Event::MessageConsumed(message) => {
block_st_transaction
.storage_as_mut::<OwnedMessageIds>()
.remove(&OwnedMessageKey::new(
message.recipient(),
message.nonce(),
))?;
}
Event::CoinCreated(coin) => {
let coin_by_owner = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.insert(&coin_by_owner, &())?;
}
Event::CoinConsumed(coin) => {
let key = owner_coin_id_key(&coin.owner, &coin.utxo_id);
block_st_transaction
.storage_as_mut::<OwnedCoins>()
.remove(&key)?;
}
}
Ok(())
}
Ok(())
}

/// Associate all transactions within a block to their respective UTXO owners
fn index_tx_owners_for_block(
&self,
block: &Block,
block_st_transaction: &mut D,
) -> anyhow::Result<()> {
for (tx_idx, tx) in block.transactions().iter().enumerate() {
let block_height = *block.header().height();
let inputs;
let outputs;
let tx_idx = u16::try_from(tx_idx).map_err(|e| {
anyhow::anyhow!("The block has more than `u16::MAX` transactions, {}", e)
})?;
let tx_id = tx.cached_id().expect(
"The imported block should contains only transactions with cached id",
);
match tx {
Transaction::Script(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
Transaction::Create(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
Transaction::Mint(_) => continue,
/// Associate all transactions within a block to their respective UTXO owners
fn index_tx_owners_for_block<D>(
block: &Block,
block_st_transaction: &mut D,
) -> anyhow::Result<()>
where
D: ports::worker::OffChainDatabase,
{
for (tx_idx, tx) in block.transactions().iter().enumerate() {
let block_height = *block.header().height();
let inputs;
let outputs;
let tx_idx = u16::try_from(tx_idx).map_err(|e| {
anyhow::anyhow!("The block has more than `u16::MAX` transactions, {}", e)
})?;
let tx_id = tx.cached_id().expect(
"The imported block should contains only transactions with cached id",
);
match tx {
Transaction::Script(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
self.persist_owners_index(
block_height,
inputs,
outputs,
&tx_id,
tx_idx,
block_st_transaction,
)?;
Transaction::Create(tx) => {
inputs = tx.inputs().as_slice();
outputs = tx.outputs().as_slice();
}
Transaction::Mint(_) => continue,
}
Ok(())
persist_owners_index(
block_height,
inputs,
outputs,
&tx_id,
tx_idx,
block_st_transaction,
)?;
}
Ok(())
}

/// Index the tx id by owner for all of the inputs and outputs
fn persist_owners_index(
&self,
block_height: BlockHeight,
inputs: &[Input],
outputs: &[Output],
tx_id: &Bytes32,
tx_idx: u16,
db: &mut D,
) -> StorageResult<()> {
let mut owners = vec![];
for input in inputs {
if let Input::CoinSigned(CoinSigned { owner, .. })
| Input::CoinPredicate(CoinPredicate { owner, .. }) = input
{
owners.push(owner);
}
/// Index the tx id by owner for all of the inputs and outputs
fn persist_owners_index<D>(
block_height: BlockHeight,
inputs: &[Input],
outputs: &[Output],
tx_id: &Bytes32,
tx_idx: u16,
db: &mut D,
) -> StorageResult<()>
where
D: ports::worker::OffChainDatabase,
{
let mut owners = vec![];
for input in inputs {
if let Input::CoinSigned(CoinSigned { owner, .. })
| Input::CoinPredicate(CoinPredicate { owner, .. }) = input
{
owners.push(owner);
}
}

for output in outputs {
match output {
Output::Coin { to, .. }
| Output::Change { to, .. }
| Output::Variable { to, .. } => {
owners.push(to);
}
Output::Contract(_) | Output::ContractCreated { .. } => {}
for output in outputs {
match output {
Output::Coin { to, .. }
| Output::Change { to, .. }
| Output::Variable { to, .. } => {
owners.push(to);
}
Output::Contract(_) | Output::ContractCreated { .. } => {}
}
}

// dedupe owners from inputs and outputs prior to indexing
owners.sort();
owners.dedup();
// dedupe owners from inputs and outputs prior to indexing
owners.sort();
owners.dedup();

for owner in owners {
db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?;
}

Ok(())
for owner in owners {
db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?;
}

fn persist_transaction_status(
&self,
import_result: &ImportResult,
db: &mut D,
) -> StorageResult<()> {
for TransactionExecutionStatus { id, result } in import_result.tx_status.iter() {
let status = from_executor_to_status(
&import_result.sealed_block.entity,
result.clone(),
);
Ok(())
}

if db.update_tx_status(id, status)?.is_some() {
return Err(anyhow::anyhow!(
"Transaction status already exists for tx {}",
id
)
.into());
}
fn persist_transaction_status<D>(
import_result: &ImportResult,
db: &mut D,
) -> StorageResult<()>
where
D: ports::worker::OffChainDatabase,
{
for TransactionExecutionStatus { id, result } in import_result.tx_status.iter() {
let status =
from_executor_to_status(&import_result.sealed_block.entity, result.clone());

if db.update_tx_status(id, status)?.is_some() {
return Err(anyhow::anyhow!(
"Transaction status already exists for tx {}",
id
)
.into());
}
Ok(())
}
Ok(())
}

#[async_trait::async_trait]
impl<D> RunnableService for Task<D>
impl<TxPool, D> RunnableService for Task<TxPool, D>
where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
const NAME: &'static str = "GraphQL_Off_Chain_Worker";
Expand Down Expand Up @@ -315,8 +329,9 @@ where
}

#[async_trait::async_trait]
impl<D> RunnableTask for Task<D>
impl<TxPool, D> RunnableTask for Task<TxPool, D>
where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
Expand Down Expand Up @@ -356,13 +371,19 @@ where
}
}

pub fn new_service<I, D>(block_importer: I, database: D) -> ServiceRunner<Task<D>>
pub fn new_service<TxPool, I, D>(
tx_pool: TxPool,
block_importer: I,
database: D,
) -> ServiceRunner<Task<TxPool, D>>
where
TxPool: ports::worker::TxPool,
I: ports::worker::BlockImporter,
D: ports::worker::OffChainDatabase,
{
let block_importer = block_importer.block_events();
ServiceRunner::new(Task {
tx_pool,
block_importer,
database,
})
Expand Down