Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract off chain logic from the executor #1579

Merged
merged 17 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Description of the upcoming release here.
### Changed

- [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p
- [#1579](https://github.com/FuelLabs/fuel-core/pull/1579): The change extracts the off-chain-related logic from the executor and moves it to the GraphQL off-chain worker. It creates two new concepts - Off-chain and On-chain databases where the GraphQL worker has exclusive ownership of the database and may modify it without intersecting with the On-chain database.
- [#1577](https://github.com/FuelLabs/fuel-core/pull/1577): Moved insertion of sealed blocks into the `BlockImporter` instead of the executor.

#### Breaking
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 32 additions & 23 deletions crates/fuel-core/src/coins_query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
fuel_core_graphql_api::service::Database,
fuel_core_graphql_api::database::ReadView,
query::asset_query::{
AssetQuery,
AssetSpendTarget,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl SpendQuery {
}

/// Return [`AssetQuery`]s.
pub fn asset_queries<'a>(&'a self, db: &'a Database) -> Vec<AssetQuery<'a>> {
pub fn asset_queries<'a>(&'a self, db: &'a ReadView) -> Vec<AssetQuery<'a>> {
self.query_per_asset
.iter()
.map(|asset| {
Expand Down Expand Up @@ -159,7 +159,7 @@ pub fn largest_first(query: &AssetQuery) -> Result<Vec<CoinType>, CoinsQueryErro

// An implementation of the method described on: https://iohk.io/en/blog/posts/2018/07/03/self-organisation-in-coin-selection/
pub fn random_improve(
db: &Database,
db: &ReadView,
spend_query: &SpendQuery,
) -> Result<Vec<Vec<CoinType>>, CoinsQueryError> {
let mut coins_per_asset = vec![];
Expand Down Expand Up @@ -229,7 +229,7 @@ mod tests {
SpendQuery,
},
database::Database,
fuel_core_graphql_api::service::Database as ServiceDatabase,
fuel_core_graphql_api::api_service::ReadDatabase as ServiceDatabase,
query::asset_query::{
AssetQuery,
AssetSpendTarget,
Expand Down Expand Up @@ -323,15 +323,19 @@ mod tests {
let result: Vec<_> = spend_query
.iter()
.map(|asset| {
largest_first(&AssetQuery::new(owner, asset, base_asset_id, None, db))
.map(|coins| {
coins
.iter()
.map(|coin| {
(*coin.asset_id(base_asset_id), coin.amount())
})
.collect()
})
largest_first(&AssetQuery::new(
owner,
asset,
base_asset_id,
None,
&db.view(),
))
.map(|coins| {
coins
.iter()
.map(|coin| (*coin.asset_id(base_asset_id), coin.amount()))
.collect()
})
})
.try_collect()?;
Ok(result)
Expand Down Expand Up @@ -484,7 +488,7 @@ mod tests {
db: &ServiceDatabase,
) -> Result<Vec<(AssetId, u64)>, CoinsQueryError> {
let coins = random_improve(
db,
&db.view(),
&SpendQuery::new(owner, &query_per_asset, None, base_asset_id)?,
);

Expand Down Expand Up @@ -682,7 +686,7 @@ mod tests {
Some(excluded_ids),
base_asset_id,
)?;
let coins = random_improve(&db.service_database(), &spend_query);
let coins = random_improve(&db.service_database().view(), &spend_query);

// Transform result for convenience
coins.map(|coins| {
Expand Down Expand Up @@ -840,7 +844,7 @@ mod tests {
}

let coins = random_improve(
&db.service_database(),
&db.service_database().view(),
&SpendQuery::new(
owner,
&[AssetSpendTarget {
Expand Down Expand Up @@ -930,7 +934,8 @@ mod tests {
}

fn service_database(&self) -> ServiceDatabase {
Box::new(self.database.clone())
let database = self.database.clone();
ServiceDatabase::new(database.clone(), database)
}
}

Expand Down Expand Up @@ -980,18 +985,22 @@ mod tests {

pub fn owned_coins(&self, owner: &Address) -> Vec<Coin> {
use crate::query::CoinQueryData;
let db = self.service_database();
db.owned_coins_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| db.coin(id).unwrap()))
let query = self.service_database();
let query = query.view();
query
.owned_coins_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.coin(id).unwrap()))
.try_collect()
.unwrap()
}

pub fn owned_messages(&self, owner: &Address) -> Vec<Message> {
use crate::query::MessageQueryData;
let db = self.service_database();
db.owned_message_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| db.message(&id).unwrap()))
let query = self.service_database();
let query = query.view();
query
.owned_message_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.message(&id).unwrap()))
.try_collect()
.unwrap()
}
Expand Down
37 changes: 11 additions & 26 deletions crates/fuel-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod tests {
Coins,
ContractsRawCode,
Messages,
Receipts,
},
StorageAsMut,
};
Expand Down Expand Up @@ -662,23 +661,18 @@ mod tests {
coinbase_recipient: config_coinbase,
..Default::default()
};
let mut producer = create_executor(Default::default(), config);
let producer = create_executor(Default::default(), config);

let mut block = Block::default();
*block.transactions_mut() = vec![script.clone().into()];

assert!(producer
let ExecutionResult { tx_status, .. } = producer
.execute_and_commit(
ExecutionBlock::Production(block.into()),
Default::default()
Default::default(),
)
.is_ok());
let receipts = producer
.database
.storage::<Receipts>()
.get(&script.id(&producer.config.consensus_parameters.chain_id))
.unwrap()
.unwrap();
.expect("Should execute the block");
let receipts = &tx_status[0].receipts;

if let Some(Receipt::Return { val, .. }) = receipts.first() {
*val == 1
Expand Down Expand Up @@ -2756,20 +2750,16 @@ mod tests {
},
);

executor
let ExecutionResult { tx_status, .. } = executor
.execute_and_commit(
ExecutionBlock::Production(block),
ExecutionOptions {
utxo_validation: true,
},
)
.unwrap();
.expect("Should execute the block");

let receipts = database
.storage::<Receipts>()
.get(&tx.id(&ChainId::default()))
.unwrap()
.unwrap();
let receipts = &tx_status[0].receipts;
assert_eq!(block_height as u64, receipts[0].val().unwrap());
}

Expand Down Expand Up @@ -2835,21 +2825,16 @@ mod tests {
},
);

executor
let ExecutionResult { tx_status, .. } = executor
.execute_and_commit(
ExecutionBlock::Production(block),
ExecutionOptions {
utxo_validation: true,
},
)
.unwrap();

let receipts = database
.storage::<Receipts>()
.get(&tx.id(&ChainId::default()))
.unwrap()
.unwrap();
.expect("Should execute the block");

let receipts = &tx_status[0].receipts;
assert_eq!(time.0, receipts[0].val().unwrap());
}
}
5 changes: 4 additions & 1 deletion crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use fuel_core_types::{
};
use std::net::SocketAddr;

pub mod api_service;
pub mod database;
pub(crate) mod metrics_extension;
pub mod ports;
pub mod service;
pub(crate) mod view_extension;
pub mod worker_service;

#[derive(Clone, Debug)]
pub struct Config {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::{
fuel_core_graphql_api::ports::{
BlockProducerPort,
ConsensusModulePort,
DatabasePort,
P2pPort,
TxPoolPort,
},
graphql_api::{
fuel_core_graphql_api::{
database::{
OffChainView,
OnChainView,
},
metrics_extension::MetricsExtension,
ports::{
BlockProducerPort,
ConsensusModulePort,
P2pPort,
TxPoolPort,
},
view_extension::ViewExtension,
Config,
},
schema::{
Expand Down Expand Up @@ -55,6 +59,7 @@ use fuel_core_services::{
RunnableTask,
StateWatcher,
};
use fuel_core_storage::transactional::AtomicView;
use futures::Stream;
use serde_json::json;
use std::{
Expand All @@ -75,7 +80,7 @@ use tower_http::{

pub type Service = fuel_core_services::ServiceRunner<GraphqlService>;

pub type Database = Box<dyn DatabasePort>;
pub use super::database::ReadDatabase;

pub type BlockProducer = Box<dyn BlockProducerPort>;
// In the future GraphQL should not be aware of `TxPool`. It should
Expand Down Expand Up @@ -160,28 +165,35 @@ impl RunnableTask for Task {

// Need a seperate Data Object for each Query endpoint, cannot be avoided
#[allow(clippy::too_many_arguments)]
pub fn new_service(
pub fn new_service<OnChain, OffChain>(
config: Config,
schema: CoreSchemaBuilder,
database: Database,
on_database: OnChain,
off_database: OffChain,
txpool: TxPool,
producer: BlockProducer,
consensus_module: ConsensusModule,
p2p_service: P2pService,
log_threshold_ms: Duration,
request_timeout: Duration,
) -> anyhow::Result<Service> {
) -> anyhow::Result<Service>
where
OnChain: AtomicView<OnChainView> + 'static,
OffChain: AtomicView<OffChainView> + 'static,
{
let network_addr = config.addr;
let combined_read_database = ReadDatabase::new(on_database, off_database);

let schema = schema
.data(config)
.data(database)
.data(combined_read_database)
.data(txpool)
.data(producer)
.data(consensus_module)
.data(p2p_service)
.extension(async_graphql::extensions::Tracing)
.extension(MetricsExtension::new(log_threshold_ms))
.extension(ViewExtension::new())
.finish();

let router = Router::new()
Expand Down