Skip to content

Commit

Permalink
Stop the ServiceRunner if it is not used (#990)
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx committed Feb 14, 2023
1 parent cd95c96 commit 756e0d4
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 101 deletions.
12 changes: 7 additions & 5 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::state::IterDirection;
use anyhow::Result;
use async_trait::async_trait;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::BoxedIter,
tables::{
Expand All @@ -19,6 +19,7 @@ use fuel_core_storage::{
Result as StorageResult,
StorageInspect,
};
use fuel_core_txpool::service::TxUpdate;
use fuel_core_types::{
blockchain::primitives::{
BlockHeight,
Expand Down Expand Up @@ -145,14 +146,15 @@ pub trait DatabaseChain {

fn base_chain_height(&self) -> StorageResult<DaBlockHeight>;
}

pub trait TxPoolPort: Send + Sync {
fn find_one(&self, id: TxId) -> Option<TxInfo>;

fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>>;

fn tx_update_subscribe(
&self,
) -> fuel_core_services::stream::BoxStream<
Result<fuel_core_txpool::service::TxUpdate, BroadcastStreamRecvError>,
>;
) -> BoxStream<Result<TxUpdate, BroadcastStreamRecvError>>;
}

#[async_trait]
Expand All @@ -162,7 +164,7 @@ pub trait DryRunExecution {
transaction: Transaction,
height: Option<BlockHeight>,
utxo_validation: Option<bool>,
) -> Result<Vec<Receipt>>;
) -> anyhow::Result<Vec<Receipt>>;
}

pub trait BlockProducerPort: Send + Sync + DryRunExecution {}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod txpool;

#[derive(Clone)]
pub struct PoAAdapter {
shared_state: fuel_core_poa::service::SharedState,
shared_state: Option<fuel_core_poa::service::SharedState>,
}

#[derive(Clone)]
Expand Down
9 changes: 7 additions & 2 deletions crates/fuel-core/src/service/adapters/consensus_module/poa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
TxPoolAdapter,
},
};
use anyhow::anyhow;
use fuel_core_poa::{
ports::{
BlockImporter,
Expand Down Expand Up @@ -37,7 +38,7 @@ use fuel_core_types::{
};

impl PoAAdapter {
pub fn new(shared_state: SharedState) -> Self {
pub fn new(shared_state: Option<SharedState>) -> Self {
Self { shared_state }
}
}
Expand All @@ -48,7 +49,11 @@ impl ConsensusModulePort for PoAAdapter {
&self,
block_times: Vec<Option<Tai64>>,
) -> anyhow::Result<()> {
self.shared_state.manually_produce_block(block_times).await
self.shared_state
.as_ref()
.ok_or(anyhow!("The block production is disabled"))?
.manually_produce_block(block_times)
.await
}
}

Expand Down
27 changes: 14 additions & 13 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use crate::{
DryRunExecution,
TxPoolPort,
},
service::sub_services::TxPoolService,
service::adapters::TxPoolAdapter,
state::IterDirection,
};
use anyhow::Result;
use async_trait::async_trait;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand All @@ -29,9 +29,12 @@ use fuel_core_storage::{
Error as StorageError,
Result as StorageResult,
};
use fuel_core_txpool::types::{
ContractId,
TxId,
use fuel_core_txpool::{
service::TxUpdate,
types::{
ContractId,
TxId,
},
};
use fuel_core_types::{
blockchain::primitives::{
Expand Down Expand Up @@ -198,21 +201,19 @@ impl DatabaseChain for Database {

impl DatabasePort for Database {}

impl TxPoolPort for TxPoolService {
impl TxPoolPort for TxPoolAdapter {
fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>> {
self.shared.insert(txs)
self.service.insert(txs)
}

fn tx_update_subscribe(
&self,
) -> fuel_core_services::stream::BoxStream<
Result<fuel_core_txpool::service::TxUpdate, BroadcastStreamRecvError>,
> {
Box::pin(BroadcastStream::new(self.shared.tx_update_subscribe()))
) -> BoxStream<Result<TxUpdate, BroadcastStreamRecvError>> {
Box::pin(BroadcastStream::new(self.service.tx_update_subscribe()))
}

fn find_one(&self, id: TxId) -> Option<fuel_core_types::services::txpool::TxInfo> {
self.shared.find_one(id)
self.service.find_one(id)
}
}

Expand All @@ -223,7 +224,7 @@ impl DryRunExecution for BlockProducerAdapter {
transaction: Transaction,
height: Option<BlockHeight>,
utxo_validation: Option<bool>,
) -> Result<Vec<TxReceipt>> {
) -> anyhow::Result<Vec<TxReceipt>> {
self.block_producer
.dry_run(transaction, height, utxo_validation)
.await
Expand Down
22 changes: 12 additions & 10 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ pub fn init_sub_services(
let poa_config: fuel_core_poa::Config = config.try_into()?;
let production_enabled =
!matches!(poa_config.trigger, Trigger::Never) || config.manual_blocks_enabled;
let poa = fuel_core_poa::new_service(
last_height,
poa_config,
tx_pool_adapter,
producer_adapter.clone(),
importer_adapter.clone(),
);
let poa_adapter = PoAAdapter::new(poa.shared.clone());
let poa = (production_enabled).then(|| {
fuel_core_poa::new_service(
last_height,
poa_config,
tx_pool_adapter.clone(),
producer_adapter.clone(),
importer_adapter.clone(),
)
});
let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone()));

#[cfg(feature = "p2p")]
let sync = (!production_enabled)
Expand Down Expand Up @@ -164,7 +166,7 @@ pub fn init_sub_services(
gql_database,
schema,
Box::new(producer_adapter),
Box::new(txpool.clone()),
Box::new(tx_pool_adapter),
Box::new(poa_adapter),
)?;

Expand All @@ -188,7 +190,7 @@ pub fn init_sub_services(
Box::new(txpool),
];

if production_enabled {
if let Some(poa) = poa {
services.push(Box::new(poa));
}

Expand Down
28 changes: 20 additions & 8 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,19 @@ pub trait RunnableTask: Send {
#[derive(Debug)]
pub struct ServiceRunner<S>
where
S: RunnableService,
S: RunnableService + 'static,
{
/// The shared state of the service
pub shared: S::SharedData,
state: Shared<watch::Sender<State>>,
}

impl<S> Clone for ServiceRunner<S>
impl<S> Drop for ServiceRunner<S>
where
S: RunnableService,
S: RunnableService + 'static,
{
fn clone(&self) -> Self {
Self {
shared: self.shared.clone(),
state: self.state.clone(),
}
fn drop(&mut self) {
self.stop();
}
}

Expand Down Expand Up @@ -454,4 +451,19 @@ mod tests {
let state = service.stop_and_await().await.unwrap();
assert!(state.stopped());
}

#[tokio::test]
async fn stop_unused_service() {
let mut receiver;
{
let service = ServiceRunner::new(MockService::new_empty());
service.start().unwrap();
receiver = service.state.subscribe();
}

receiver.changed().await.unwrap();
assert!(matches!(receiver.borrow().clone(), State::Stopping));
receiver.changed().await.unwrap();
assert!(receiver.borrow().stopped());
}
}
18 changes: 15 additions & 3 deletions tests/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ async fn test_contract_salt() {
let (_, contract_id) = test_builder.setup_contract(vec![], None);

// spin up node
let TestContext { client, .. } = test_builder.finalize().await;
let TestContext {
client,
srv: _dont_drop,
..
} = test_builder.finalize().await;

let contract = client
.contract(format!("{contract_id:#x}").as_str())
Expand All @@ -52,7 +56,11 @@ async fn test_contract_balance(
test_builder.setup_contract(vec![], Some(vec![(asset, test_balance)]));

// spin up node
let TestContext { client, .. } = test_builder.finalize().await;
let TestContext {
client,
srv: _dont_drop,
..
} = test_builder.finalize().await;

let balance = client
.contract_balance(
Expand Down Expand Up @@ -85,7 +93,11 @@ async fn test_5_contract_balances(
]),
);

let TestContext { client, .. } = test_builder.finalize().await;
let TestContext {
client,
srv: _dont_drop,
..
} = test_builder.finalize().await;

let contract_balances = client
.contract_balances(
Expand Down
4 changes: 3 additions & 1 deletion tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::collections::HashMap;

/// Helper for wrapping a currently running node environment
pub struct TestContext {
pub srv: FuelService,
pub rng: StdRng,
pub client: FuelClient,
}
Expand All @@ -34,7 +35,7 @@ impl TestContext {
let rng = StdRng::seed_from_u64(seed);
let srv = FuelService::new_node(Config::local_node()).await.unwrap();
let client = FuelClient::from(srv.bound_address);
Self { rng, client }
Self { srv, rng, client }
}
}

Expand Down Expand Up @@ -149,6 +150,7 @@ impl TestSetupBuilder {
let client = FuelClient::from(srv.bound_address);

TestContext {
srv,
rng: self.rng.clone(),
client,
}
Expand Down
Loading

0 comments on commit 756e0d4

Please sign in to comment.