Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop the ServiceRunner if it is not used #990

Merged
merged 7 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::state::IterDirection;
use anyhow::Result;
use async_trait::async_trait;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::BoxedIter,
tables::{
Expand All @@ -19,6 +19,7 @@ use fuel_core_storage::{
Result as StorageResult,
StorageInspect,
};
use fuel_core_txpool::service::TxUpdate;
use fuel_core_types::{
blockchain::primitives::{
BlockHeight,
Expand Down Expand Up @@ -145,14 +146,15 @@ pub trait DatabaseChain {

fn base_chain_height(&self) -> StorageResult<DaBlockHeight>;
}

pub trait TxPoolPort: Send + Sync {
fn find_one(&self, id: TxId) -> Option<TxInfo>;

fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>>;

fn tx_update_subscribe(
&self,
) -> fuel_core_services::stream::BoxStream<
Result<fuel_core_txpool::service::TxUpdate, BroadcastStreamRecvError>,
>;
) -> BoxStream<Result<TxUpdate, BroadcastStreamRecvError>>;
}

#[async_trait]
Expand All @@ -162,7 +164,7 @@ pub trait DryRunExecution {
transaction: Transaction,
height: Option<BlockHeight>,
utxo_validation: Option<bool>,
) -> Result<Vec<Receipt>>;
) -> anyhow::Result<Vec<Receipt>>;
}

pub trait BlockProducerPort: Send + Sync + DryRunExecution {}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod txpool;

#[derive(Clone)]
pub struct PoAAdapter {
shared_state: fuel_core_poa::service::SharedState,
shared_state: Option<fuel_core_poa::service::SharedState>,
}

#[derive(Clone)]
Expand Down
9 changes: 7 additions & 2 deletions crates/fuel-core/src/service/adapters/consensus_module/poa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
TxPoolAdapter,
},
};
use anyhow::anyhow;
use fuel_core_poa::{
ports::{
BlockImporter,
Expand Down Expand Up @@ -37,7 +38,7 @@ use fuel_core_types::{
};

impl PoAAdapter {
pub fn new(shared_state: SharedState) -> Self {
pub fn new(shared_state: Option<SharedState>) -> Self {
Self { shared_state }
}
}
Expand All @@ -48,7 +49,11 @@ impl ConsensusModulePort for PoAAdapter {
&self,
block_times: Vec<Option<Tai64>>,
) -> anyhow::Result<()> {
self.shared_state.manually_produce_block(block_times).await
self.shared_state
.as_ref()
.ok_or(anyhow!("The block production is disabled"))?
.manually_produce_block(block_times)
.await
}
}

Expand Down
27 changes: 14 additions & 13 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use crate::{
DryRunExecution,
TxPoolPort,
},
service::sub_services::TxPoolService,
service::adapters::TxPoolAdapter,
state::IterDirection,
};
use anyhow::Result;
use async_trait::async_trait;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand All @@ -29,9 +29,12 @@ use fuel_core_storage::{
Error as StorageError,
Result as StorageResult,
};
use fuel_core_txpool::types::{
ContractId,
TxId,
use fuel_core_txpool::{
service::TxUpdate,
types::{
ContractId,
TxId,
},
};
use fuel_core_types::{
blockchain::primitives::{
Expand Down Expand Up @@ -198,21 +201,19 @@ impl DatabaseChain for Database {

impl DatabasePort for Database {}

impl TxPoolPort for TxPoolService {
impl TxPoolPort for TxPoolAdapter {
fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>> {
self.shared.insert(txs)
self.service.insert(txs)
}

fn tx_update_subscribe(
&self,
) -> fuel_core_services::stream::BoxStream<
Result<fuel_core_txpool::service::TxUpdate, BroadcastStreamRecvError>,
> {
Box::pin(BroadcastStream::new(self.shared.tx_update_subscribe()))
) -> BoxStream<Result<TxUpdate, BroadcastStreamRecvError>> {
Box::pin(BroadcastStream::new(self.service.tx_update_subscribe()))
}

fn find_one(&self, id: TxId) -> Option<fuel_core_types::services::txpool::TxInfo> {
self.shared.find_one(id)
self.service.find_one(id)
}
}

Expand All @@ -223,7 +224,7 @@ impl DryRunExecution for BlockProducerAdapter {
transaction: Transaction,
height: Option<BlockHeight>,
utxo_validation: Option<bool>,
) -> Result<Vec<TxReceipt>> {
) -> anyhow::Result<Vec<TxReceipt>> {
self.block_producer
.dry_run(transaction, height, utxo_validation)
.await
Expand Down
22 changes: 12 additions & 10 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ pub fn init_sub_services(
let poa_config: fuel_core_poa::Config = config.try_into()?;
let production_enabled =
!matches!(poa_config.trigger, Trigger::Never) || config.manual_blocks_enabled;
let poa = fuel_core_poa::new_service(
last_height,
poa_config,
tx_pool_adapter,
producer_adapter.clone(),
importer_adapter.clone(),
);
let poa_adapter = PoAAdapter::new(poa.shared.clone());
let poa = (production_enabled).then(|| {
fuel_core_poa::new_service(
last_height,
poa_config,
tx_pool_adapter.clone(),
producer_adapter.clone(),
importer_adapter.clone(),
)
});
let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone()));

#[cfg(feature = "p2p")]
let sync = (!production_enabled)
Expand Down Expand Up @@ -164,7 +166,7 @@ pub fn init_sub_services(
gql_database,
schema,
Box::new(producer_adapter),
Box::new(txpool.clone()),
Box::new(tx_pool_adapter),
Box::new(poa_adapter),
)?;

Expand All @@ -188,7 +190,7 @@ pub fn init_sub_services(
Box::new(txpool),
];

if production_enabled {
if let Some(poa) = poa {
services.push(Box::new(poa));
}

Expand Down
28 changes: 20 additions & 8 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,19 @@ pub trait RunnableTask: Send {
#[derive(Debug)]
pub struct ServiceRunner<S>
where
S: RunnableService,
S: RunnableService + 'static,
{
/// The shared state of the service
pub shared: S::SharedData,
state: Shared<watch::Sender<State>>,
}

impl<S> Clone for ServiceRunner<S>
impl<S> Drop for ServiceRunner<S>
where
S: RunnableService,
S: RunnableService + 'static,
{
fn clone(&self) -> Self {
Self {
shared: self.shared.clone(),
state: self.state.clone(),
}
fn drop(&mut self) {
self.stop();
Voxelot marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -454,4 +451,19 @@ mod tests {
let state = service.stop_and_await().await.unwrap();
assert!(state.stopped());
}

#[tokio::test]
async fn stop_unused_service() {
let mut receiver;
{
let service = ServiceRunner::new(MockService::new_empty());
service.start().unwrap();
receiver = service.state.subscribe();
}

receiver.changed().await.unwrap();
assert!(matches!(receiver.borrow().clone(), State::Stopping));
receiver.changed().await.unwrap();
assert!(receiver.borrow().stopped());
}
}
18 changes: 15 additions & 3 deletions tests/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ async fn test_contract_salt() {
let (_, contract_id) = test_builder.setup_contract(vec![], None);

// spin up node
let TestContext { client, .. } = test_builder.finalize().await;
let TestContext {
client,
srv: _dont_drop,
..
} = test_builder.finalize().await;

let contract = client
.contract(format!("{contract_id:#x}").as_str())
Expand All @@ -52,7 +56,11 @@ async fn test_contract_balance(
test_builder.setup_contract(vec![], Some(vec![(asset, test_balance)]));

// spin up node
let TestContext { client, .. } = test_builder.finalize().await;
let TestContext {
client,
srv: _dont_drop,
..
} = test_builder.finalize().await;

let balance = client
.contract_balance(
Expand Down Expand Up @@ -85,7 +93,11 @@ async fn test_5_contract_balances(
]),
);

let TestContext { client, .. } = test_builder.finalize().await;
let TestContext {
client,
srv: _dont_drop,
..
} = test_builder.finalize().await;

let contract_balances = client
.contract_balances(
Expand Down
4 changes: 3 additions & 1 deletion tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::collections::HashMap;

/// Helper for wrapping a currently running node environment
pub struct TestContext {
pub srv: FuelService,
pub rng: StdRng,
pub client: FuelClient,
}
Expand All @@ -34,7 +35,7 @@ impl TestContext {
let rng = StdRng::seed_from_u64(seed);
let srv = FuelService::new_node(Config::local_node()).await.unwrap();
let client = FuelClient::from(srv.bound_address);
Self { rng, client }
Self { srv, rng, client }
}
}

Expand Down Expand Up @@ -149,6 +150,7 @@ impl TestSetupBuilder {
let client = FuelClient::from(srv.bound_address);

TestContext {
srv,
rng: self.rng.clone(),
client,
}
Expand Down
Loading