Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ce79f6c
fix: no reboot after GC
hanabi1224 Apr 1, 2026
f1a24de
Merge branch 'main' into hm/no-reboot-after-gc
hanabi1224 Apr 2, 2026
ee8f85b
max retries
hanabi1224 Apr 2, 2026
011f36a
Merge branch 'hm/no-reboot-after-gc' of github.com:ChainSafe/forest i…
hanabi1224 Apr 2, 2026
9b83119
resolve AI comments
hanabi1224 Apr 2, 2026
de2dfca
simplify ZstdFrameCache usage
hanabi1224 Apr 2, 2026
e694674
do proper cleanup
hanabi1224 Apr 2, 2026
568838e
refine the logic in gc_once
hanabi1224 Apr 2, 2026
374173f
fix
hanabi1224 Apr 2, 2026
974f40e
fix
hanabi1224 Apr 2, 2026
0f33f69
mitigate test timeout issue
hanabi1224 Apr 2, 2026
ae513c5
Merge remote-tracking branch 'origin/main' into hm/no-reboot-after-gc
hanabi1224 Apr 3, 2026
b3d5366
fix
hanabi1224 Apr 3, 2026
d72546c
Merge branch 'main' into hm/no-reboot-after-gc
hanabi1224 Apr 6, 2026
906356d
resolve comments
hanabi1224 Apr 6, 2026
ac7ef39
Merge remote-tracking branch 'origin/main' into hm/no-reboot-after-gc
hanabi1224 Apr 6, 2026
b0d5e06
fix
hanabi1224 Apr 6, 2026
e8a6070
tokio spawn blocking for reset_gc_columns
hanabi1224 Apr 7, 2026
ff816df
Merge remote-tracking branch 'origin/main' into hm/no-reboot-after-gc
hanabi1224 Apr 7, 2026
f6d8586
used cached state compute result
hanabi1224 Apr 8, 2026
a43093b
Merge remote-tracking branch 'origin/main' into hm/no-reboot-after-gc
hanabi1224 Apr 8, 2026
0ed0cc9
verify result state tree is loadable in ForestStateCompute
hanabi1224 Apr 8, 2026
188aa9d
Merge remote-tracking branch 'origin/main' into hm/no-reboot-after-gc
hanabi1224 Apr 8, 2026
5f9cff2
fix log
hanabi1224 Apr 9, 2026
f2faaff
Merge remote-tracking branch 'origin/main' into hm/no-reboot-after-gc
hanabi1224 Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/docs/users/guides/gc.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ GC can be trigger manually with `forest-cli chain prune snap`, regardless whethe
Garbage Collection (GC) runs on a regular schedule and follows these steps:

- Export an effective standard lite snapshot in `.forest.car.zst` format.
- Stop the node.
- Purge parity-db columns that serve as non-persistent blockstore.
- Purge old CAR database files.
- Restart the node.

This process keeps the system clean by regularly removing old, unused data.

Expand Down Expand Up @@ -67,7 +65,6 @@ While GC runs in the background, it can cause some delays or pauses, particularl

- **Syncing Pauses**: There may be brief interruptions in syncing as resources are allocated for the GC process.
- **Performance Overhead**: While relatively efficient, the chain traversal algorithm could slow down operations slightly.
- **Reboot pauses**: The GC stops the node before cleaning up parity-db and CAR snapshots and then restarts the node, which could take `~10s-~30s` on mainnet

## Disk Usage

Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ impl<DB: Blockstore> SyncStateMachine<DB> {

fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
if !self.is_parent_validated(&tipset) {
tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated");
tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
return;
}

Expand Down
11 changes: 7 additions & 4 deletions src/daemon/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::daemon::asyncify;
use crate::daemon::bundle::load_actor_bundles;
use crate::daemon::db_util::load_all_forest_cars_with_cleanup;
use crate::db::car::ManyCar;
use crate::db::db_engine::{db_root, open_db};
use crate::db::parity_db::ParityDb;
use crate::db::db_engine::db_root;
use crate::db::parity_db::{GarbageCollectableParityDb, ParityDb};
use crate::db::{CAR_DB_DIR_NAME, DummyStore, EthMappingsStore};
use crate::genesis::read_genesis_header;
use crate::libp2p::{Keypair, PeerId};
Expand Down Expand Up @@ -178,7 +178,7 @@ fn maybe_migrate_db(config: &Config) {
}
}

pub type DbType = ManyCar<Arc<ParityDb>>;
pub type DbType = ManyCar<Arc<GarbageCollectableParityDb>>;

pub(crate) struct DbMetadata {
db_root_dir: PathBuf,
Expand All @@ -204,7 +204,10 @@ async fn setup_db(opts: &CliOpts, config: &Config) -> anyhow::Result<(Arc<DbType
maybe_migrate_db(config);
let chain_data_path = chain_path(config);
let db_root_dir = db_root(&chain_data_path)?;
let db_writer = Arc::new(open_db(db_root_dir.clone(), config.db_config())?);
let db_writer = Arc::new(GarbageCollectableParityDb::new(ParityDb::to_options(
db_root_dir.clone(),
config.db_config(),
))?);
let db = Arc::new(ManyCar::new(db_writer.clone()));
let forest_car_db_dir = db_root_dir.join(CAR_DB_DIR_NAME);
load_all_forest_cars_with_cleanup(&db, &forest_car_db_dir)?;
Expand Down
107 changes: 63 additions & 44 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub mod main;
use crate::blocks::Tipset;
use crate::chain::ChainStore;
use crate::chain::index::ResolveNullTipset;
use crate::chain_sync::ChainFollower;
use crate::chain_sync::network_context::SyncNetworkContext;
use crate::chain_sync::{ChainFollower, SyncStatus};
use crate::cli_shared::snapshot;
use crate::cli_shared::{
chain_path,
Expand Down Expand Up @@ -81,8 +81,9 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul
let start_time = chrono::Utc::now();
let mut terminate = signal(SignalKind::terminate())?;
let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
let result = tokio::select! {
ret = start(start_time, opts, config, shutdown_send) => ret,
ret = start(start_time, opts, config, shutdown_send, rpc_stop_handle) => ret,
_ = ctrl_c() => {
info!("Keyboard interrupt.");
Ok(())
Expand All @@ -96,6 +97,7 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul
Ok(())
},
};
_ = rpc_server_handle.stop();
crate::utils::io::terminal_cleanup();
result
}
Expand Down Expand Up @@ -365,6 +367,47 @@ async fn maybe_start_health_check_service(
Ok(())
}

fn maybe_start_gc_service(
services: &mut JoinSet<anyhow::Result<()>>,
opts: &CliOpts,
config: &Config,
cs: Arc<ChainStore<DbType>>,
sync_status: crate::chain_sync::SyncStatus,
) -> anyhow::Result<()> {
// If the node is stateless, GC shouldn't get triggered even on demand.
if opts.stateless {
return Ok(());
}

let snap_gc = Arc::new(SnapshotGarbageCollector::new(cs, sync_status, config)?);

GLOBAL_SNAPSHOT_GC
.set(snap_gc.clone())
.ok()
.context("failed to set GLOBAL_SNAPSHOT_GC")?;

services.spawn({
let snap_gc = snap_gc.clone();
async move {
snap_gc.event_loop().await;
Ok(())
}
});

// GC shouldn't run periodically if the node is stateless or if the user has disabled it.
if !opts.no_gc {
services.spawn({
let snap_gc = snap_gc.clone();
async move {
snap_gc.scheduler_loop().await;
Ok(())
}
});
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
fn maybe_start_rpc_service(
services: &mut JoinSet<anyhow::Result<()>>,
Expand Down Expand Up @@ -572,48 +615,17 @@ pub(super) async fn start(
opts: CliOpts,
config: Config,
shutdown_send: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
) -> anyhow::Result<()> {
startup_init(&config)?;
let (snap_gc, snap_gc_reboot_rx) = SnapshotGarbageCollector::new(&config)?;
let snap_gc = Arc::new(snap_gc);
GLOBAL_SNAPSHOT_GC
.set(snap_gc.clone())
.ok()
.context("failed to set GLOBAL_SNAPSHOT_GC")?;

// If the node is stateless, GC shouldn't get triggered even on demand.
if !opts.stateless {
tokio::task::spawn({
let snap_gc = snap_gc.clone();
async move { snap_gc.event_loop().await }
});
}
// GC shouldn't run periodically if the node is stateless or if the user has disabled it.
if !opts.no_gc && !opts.stateless {
tokio::task::spawn({
let snap_gc = snap_gc.clone();
async move { snap_gc.scheduler_loop().await }
});
}
loop {
let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
tokio::select! {
_ = snap_gc_reboot_rx.recv_async() => {
// gracefully shutdown RPC server
if let Err(e) = rpc_server_handle.stop() {
tracing::warn!("failed to stop RPC server: {e}");
}
snap_gc.cleanup_before_reboot().await;
}
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| {
snap_gc.set_db(ctx.db.clone());
snap_gc.set_sync_status(sync_status);
snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
}) => {
break result
}
}
}
start_services(
start_time,
&opts,
config.clone(),
shutdown_send.clone(),
rpc_stop_handle,
)
.await
}

pub(super) async fn start_services(
Expand All @@ -622,7 +634,6 @@ pub(super) async fn start_services(
mut config: Config,
shutdown_send: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
) -> anyhow::Result<()> {
// Cleanup the collector prometheus metrics registry on start
crate::metrics::reset_collector_registry();
Expand All @@ -640,6 +651,7 @@ pub(super) async fn start_services(
{
tracing::warn!("error in maybe_rewind_heaviest_tipset: {e:#}");
}

let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
Expand All @@ -661,8 +673,15 @@ pub(super) async fn start_services(
services.shutdown().await;
return Ok(());
}
on_app_context_and_db_initialized(&ctx, chain_follower.sync_status.clone());

warmup_in_background(&ctx);
maybe_start_gc_service(
&mut services,
opts,
&config,
ctx.chain_store().clone(),
chain_follower.sync_status.clone(),
)?;
maybe_start_metrics_service(&mut services, &config, &ctx).await?;
maybe_start_f3_service(opts, &config, &ctx)?;
maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
Expand Down
11 changes: 5 additions & 6 deletions src/db/car/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use positioned_io::ReadAt;
use std::borrow::Cow;
use std::io::{Error, ErrorKind, Read, Result};
use std::path::{Path, PathBuf};
use std::sync::Arc;

#[derive(derive_more::From)]
pub enum AnyCar<ReaderT> {
Expand Down Expand Up @@ -93,16 +92,16 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
}

/// Discard reader type and replace with dynamic trait object.
pub fn into_dyn(self) -> Result<AnyCar<Box<dyn super::RandomAccessFileReader>>> {
Ok(match self {
AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()?),
pub fn into_dyn(self) -> AnyCar<Box<dyn super::RandomAccessFileReader>> {
match self {
AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()),
AnyCar::Plain(p) => AnyCar::Plain(p.into_dyn()),
AnyCar::Memory(m) => AnyCar::Memory(m),
})
}
}

/// Set the z-frame cache of the inner CAR reader.
pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
pub fn with_cache(self, cache: ZstdFrameCache, key: CacheKey) -> Self {
match self {
AnyCar::Forest(f) => AnyCar::Forest(f.with_cache(cache, key)),
AnyCar::Plain(p) => AnyCar::Plain(p),
Expand Down
26 changes: 11 additions & 15 deletions src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,10 @@ use fvm_ipld_encoding::CborStore as _;
use integer_encoding::VarIntReader;
use nunny::Vec as NonEmpty;
use positioned_io::{Cursor, ReadAt, Size as _, SizeCursor};
use std::io::{Seek, SeekFrom};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::{Arc, OnceLock};
use std::sync::OnceLock;
use std::task::Poll;
use std::{
io,
io::{Read, Write},
};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{Decoder, Encoder as _};

Expand All @@ -97,7 +93,7 @@ pub struct ForestCar<ReaderT> {
cache_key: CacheKey,
indexed: index::Reader<index::ZstdSkipFramesEncodedDataReader<positioned_io::Slice<ReaderT>>>,
index_size_bytes: u64,
frame_cache: Arc<ZstdFrameCache>,
frame_cache: ZstdFrameCache,
header: CarV1Header,
metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
}
Expand All @@ -114,12 +110,12 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
) -> io::Result<ForestCar<ReaderT>> {
let indexed = index::Reader::new(index::ZstdSkipFramesEncodedDataReader::new(
positioned_io::Slice::new(reader, index_start_pos, Some(index_size_bytes)),
)?)?;
))?;
Ok(ForestCar {
cache_key: 0,
indexed,
index_size_bytes,
frame_cache: Arc::new(ZstdFrameCache::default()),
frame_cache: ZstdFrameCache::default(),
header,
metadata: OnceLock::new(),
})
Expand Down Expand Up @@ -201,26 +197,26 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
Tipset::load_required(self, &self.heaviest_tipset_key())
}

pub fn into_dyn(self) -> io::Result<ForestCar<Box<dyn super::RandomAccessFileReader>>> {
Ok(ForestCar {
pub fn into_dyn(self) -> ForestCar<Box<dyn super::RandomAccessFileReader>> {
ForestCar {
cache_key: self.cache_key,
indexed: self.indexed.map(|slice| {
let offset = slice.inner().offset();
let size = slice.inner().size()?;
let size = slice.inner().size().ok().flatten();
ZstdSkipFramesEncodedDataReader::new(positioned_io::Slice::new(
Box::new(slice.into_inner().into_inner()) as Box<dyn RandomAccessFileReader>,
offset,
size,
))
})?,
}),
index_size_bytes: self.index_size_bytes,
frame_cache: self.frame_cache,
header: self.header,
metadata: self.metadata,
})
}
}

pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
pub fn with_cache(self, cache: ZstdFrameCache, key: CacheKey) -> Self {
Self {
cache_key: key,
frame_cache: cache,
Expand Down
20 changes: 9 additions & 11 deletions src/db/car/forest/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ where
/// Replace the inner reader.
/// It MUST point to the same underlying IO, else future calls to `get`
/// will be incorrect.
pub fn map<T>(self, f: impl FnOnce(R) -> io::Result<T>) -> io::Result<Reader<T>> {
Ok(Reader {
inner: f(self.inner)?,
pub fn map<T>(self, f: impl FnOnce(R) -> T) -> Reader<T> {
Reader {
inner: f(self.inner),
table_offset: self.table_offset,
header: self.header,
})
}
}
}

Expand All @@ -201,7 +201,7 @@ pub struct ZstdSkipFramesEncodedDataReader<R> {
}

impl<R: ReadAt> ZstdSkipFramesEncodedDataReader<R> {
pub fn new(reader: R) -> io::Result<Self> {
pub fn new(reader: R) -> Self {
let mut offset = 0;
let mut skip_frame_header_offsets = vec![];
while let Ok(data_len) = reader
Expand All @@ -210,10 +210,10 @@ impl<R: ReadAt> ZstdSkipFramesEncodedDataReader<R> {
skip_frame_header_offsets.push(offset);
offset += ZSTD_SKIP_FRAME_LEN + u64::from(data_len);
}
Ok(Self {
Self {
reader,
skip_frame_header_offsets,
})
}
}

pub fn inner(&self) -> &R {
Expand Down Expand Up @@ -818,8 +818,7 @@ mod tests {
}
})?;
Ok(())
}))
.unwrap();
}));
if multi_index_frame {
assert!(!r.skip_frame_header_offsets.is_empty());
} else {
Expand Down Expand Up @@ -858,8 +857,7 @@ mod tests {
}
})?;
Ok(())
}))
.unwrap();
}));
if multi_index_frame {
assert!(!r.skip_frame_header_offsets.is_empty());
} else {
Expand Down
Loading
Loading