Skip to content

Commit

Permalink
feat: snapshot generation graceful shutdown (#1832)
Browse files Browse the repository at this point in the history
Snapshot exporting can now be cancelled. Progress updates provided as well.
  • Loading branch information
segfault-magnet committed Apr 24, 2024
1 parent f7bd84e commit abb407d
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 317 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed

- [#1832](https://github.com/FuelLabs/fuel-core/pull/1832): Snapshot generation can be cancelled. Progress is also reported.
- [#1837](https://github.com/FuelLabs/fuel-core/pull/1837): Refactor the executor and separate validation from the other use cases

## [Version 0.25.2]
Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ path = "src/main.rs"

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive", "env", "string"] }
const_format = { version = "0.2", optional = true }
dirs = "4.0"
Expand All @@ -31,6 +32,7 @@ pyroscope_pprofrs = "0.2"
serde_json = { workspace = true }
tikv-jemallocator = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
Expand Down
54 changes: 54 additions & 0 deletions bin/fuel-core/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::Parser;
use fuel_core::service::genesis::NotifyCancel;
use fuel_core_chain_config::{
ChainConfig,
SnapshotReader,
Expand All @@ -9,6 +10,7 @@ use std::{
path::PathBuf,
str::FromStr,
};
use tokio_util::sync::CancellationToken;
use tracing_subscriber::{
filter::EnvFilter,
layer::SubscriberExt,
Expand Down Expand Up @@ -159,6 +161,58 @@ pub fn local_testnet_reader() -> SnapshotReader {
SnapshotReader::new_in_memory(local_testnet_chain_config(), state_config)
}

#[derive(Clone)]
pub struct ShutdownListener {
token: CancellationToken,
}

impl ShutdownListener {
pub fn spawn() -> Self {
let token = CancellationToken::new();
{
let token = token.clone();
tokio::spawn(async move {
let mut sigterm = tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::terminate(),
)?;

let mut sigint = tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::interrupt(),
)?;
#[cfg(unix)]
tokio::select! {
_ = sigterm.recv() => {
tracing::info!("Received SIGTERM");
}
_ = sigint.recv() => {
tracing::info!("Received SIGINT");
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await?;
tracing::info!("Received ctrl_c");
}
token.cancel();
tokio::io::Result::Ok(())
});
}
Self { token }
}
}

#[async_trait::async_trait]
impl NotifyCancel for ShutdownListener {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.token.cancelled().await;
Ok(())
}

fn is_cancelled(&self) -> bool {
self.token.is_cancelled()
}
}

#[cfg(any(feature = "rocksdb", feature = "rocksdb-production"))]
#[cfg(test)]
mod tests {
Expand Down
32 changes: 5 additions & 27 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
consensus::PoATriggerArgs,
tx_pool::TxPoolArgs,
},
ShutdownListener,
},
FuelService,
};
Expand All @@ -21,6 +22,7 @@ use fuel_core::{
producer::Config as ProducerConfig,
service::{
config::Trigger,
genesis::NotifyCancel,
Config,
DbType,
RelayerConsensusConfig,
Expand Down Expand Up @@ -395,13 +397,14 @@ pub fn get_service(command: Command) -> anyhow::Result<FuelService> {
pub async fn exec(command: Command) -> anyhow::Result<()> {
let service = get_service(command)?;

let shutdown_listener = ShutdownListener::spawn();
// Genesis could take a long time depending on the snapshot size. Start needs to be
// interruptible by the shutdown_signal
tokio::select! {
result = service.start_and_await() => {
result?;
}
_ = shutdown_signal() => {
_ = shutdown_listener.wait_until_cancelled() => {
service.stop();
}
}
Expand All @@ -411,7 +414,7 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
result = service.await_stop() => {
result?;
}
_ = shutdown_signal() => {}
_ = shutdown_listener.wait_until_cancelled() => {}
}

service.stop_and_await().await?;
Expand Down Expand Up @@ -463,28 +466,3 @@ fn start_pyroscope_agent(
})
.transpose()
}

async fn shutdown_signal() -> anyhow::Result<()> {
#[cfg(unix)]
{
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;

let mut sigint =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
tokio::select! {
_ = sigterm.recv() => {
tracing::info!("sigterm received");
}
_ = sigint.recv() => {
tracing::info!("sigint received");
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await?;
tracing::info!("CTRL+C received");
}
Ok(())
}
16 changes: 13 additions & 3 deletions bin/fuel-core/src/cli/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,14 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
MAX_GROUP_SIZE,
};

use crate::cli::ShutdownListener;

let db = open_db(
&command.database_path,
Some(command.max_database_cache_size),
)?;
let output_dir = command.output_dir;
let shutdown_listener = ShutdownListener::spawn();

match command.subcommand {
SubCommands::Everything {
Expand All @@ -149,15 +152,22 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
load_chain_config_or_use_testnet(chain_config.as_deref())?,
writer,
group_size,
shutdown_listener,
)
.write_full_snapshot()
.await
}
SubCommands::Contract { contract_id } => {
let writer = move || Ok(SnapshotWriter::json(output_dir.clone()));
Exporter::new(db, local_testnet_chain_config(), writer, MAX_GROUP_SIZE)
.write_contract_snapshot(contract_id)
.await
Exporter::new(
db,
local_testnet_chain_config(),
writer,
MAX_GROUP_SIZE,
shutdown_listener,
)
.write_contract_snapshot(contract_id)
.await
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ strum_macros = { workspace = true }
tempfile = { workspace = true, optional = true }
thiserror = "1.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-rayon = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tower-http = { version = "0.3", features = ["set-header", "trace", "timeout"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-core/src/service/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ use itertools::Itertools;

mod exporter;
mod importer;
mod progress;
mod task_manager;

pub use exporter::Exporter;
pub use task_manager::NotifyCancel;

use self::importer::SnapshotImporter;

Expand Down
45 changes: 29 additions & 16 deletions crates/fuel-core/src/service/genesis/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use fuel_core_chain_config::{
StateConfigBuilder,
TableEntry,
};
use fuel_core_services::State;
use fuel_core_storage::{
blueprint::BlueprintInspect,
iter::IterDirection,
kv_store::StorageColumn,
structured_storage::TableWithBlueprint,
tables::{
Coins,
Expand All @@ -46,16 +46,20 @@ use fuel_core_storage::{
};
use fuel_core_types::fuel_types::ContractId;
use itertools::Itertools;
use tokio::sync::watch;

use super::task_manager::TaskManager;
use super::{
progress::MultipleProgressReporter,
task_manager::TaskManager,
NotifyCancel,
};

pub struct Exporter<Fun> {
db: CombinedDatabase,
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
task_manager: TaskManager<SnapshotFragment>,
multi_progress: MultipleProgressReporter,
}

impl<Fun> Exporter<Fun>
Expand All @@ -67,16 +71,17 @@ where
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
cancel_token: impl NotifyCancel + Send + Sync + 'static,
) -> Self {
// TODO: Support graceful shutdown during the exporting of the snapshot.
// https://github.com/FuelLabs/fuel-core/issues/1828
let (_, receiver) = watch::channel(State::Started);
Self {
db,
prev_chain_config,
writer,
group_size,
task_manager: TaskManager::new(receiver.into()),
task_manager: TaskManager::new(cancel_token),
multi_progress: MultipleProgressReporter::new(tracing::info_span!(
"snapshot_exporter"
)),
}
}

Expand Down Expand Up @@ -174,15 +179,23 @@ where

let db = db_picker(self).clone();
let prefix = prefix.map(|p| p.to_vec());
self.task_manager.spawn(move |cancel| {
tokio_rayon::spawn(move || {
db.entries::<T>(prefix, IterDirection::Forward)
.chunks(group_size)
.into_iter()
.take_while(|_| !cancel.is_cancelled())
.try_for_each(|chunk| writer.write(chunk.try_collect()?))?;
writer.partial_close()
})
// TODO:
// [1857](https://github.com/FuelLabs/fuel-core/issues/1857)
// RocksDb can provide an estimate for the number of items.
let progress_tracker =
self.multi_progress.table_reporter(None, T::column().name());
self.task_manager.spawn_blocking(move |cancel| {
db.entries::<T>(prefix, IterDirection::Forward)
.chunks(group_size)
.into_iter()
.take_while(|_| !cancel.is_cancelled())
.enumerate()
.try_for_each(|(index, chunk)| {
progress_tracker.set_index(index);

writer.write(chunk.try_collect()?)
})?;
writer.partial_close()
});

Ok(())
Expand Down
Loading

0 comments on commit abb407d

Please sign in to comment.