diff --git a/docs/docs/users/reference/cli.sh b/docs/docs/users/reference/cli.sh index e791c35b804d..1ed416bc6a76 100755 --- a/docs/docs/users/reference/cli.sh +++ b/docs/docs/users/reference/cli.sh @@ -164,3 +164,11 @@ generate_markdown_section "forest-dev" "state compute" generate_markdown_section "forest-dev" "state replay-compute" generate_markdown_section "forest-dev" "state validate" generate_markdown_section "forest-dev" "state replay-validate" + +generate_markdown_section "forest-dev" "update-checkpoints" + +generate_markdown_section "forest-dev" "archive-missing" + +generate_markdown_section "forest-dev" "export-tipset-lookup" + +generate_markdown_section "forest-dev" "export-state-tree" diff --git a/src/dev/subcommands/export_state_tree_cmd.rs b/src/dev/subcommands/export_state_tree_cmd.rs new file mode 100644 index 000000000000..7e8803781bb3 --- /dev/null +++ b/src/dev/subcommands/export_state_tree_cmd.rs @@ -0,0 +1,124 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::{ + chain::{ChainStore, index::ResolveNullTipset}, + cli_shared::{chain_path, read_config}, + daemon::db_util::load_all_forest_cars, + db::{ + CAR_DB_DIR_NAME, + car::ManyCar, + car::forest::FOREST_CAR_FILE_EXTENSION, + db_engine::{db_root, open_db}, + }, + genesis::read_genesis_header, + ipld::IpldStream, + networks::{ChainConfig, NetworkChain}, + shim::{clock::ChainEpoch, executor::Receipt}, +}; +use anyhow::Context as _; +use clap::Args; +use itertools::Itertools; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; +use tokio::io::AsyncWriteExt as _; + +/// Exports N consecutive parent state trees(together with messages, message receipts and events) of the tipset at the given epoch +#[derive(Debug, Args)] +pub struct ExportStateTreeCommand { + /// Filecoin network chain (e.g., calibnet, mainnet) + #[arg(long, required = true)] + chain: NetworkChain, + /// Optional path to the database folder + #[arg(long)] + db: Option, + /// The maximum tipset epoch to export state tree from (Exclusive) + #[arg(long)] + from: ChainEpoch, + /// The minimum tipset epoch to export state tree from (Inclusive) + #[arg(long)] + to: ChainEpoch, + /// The path to the output `ForestCAR` file + #[arg(short, long)] + output: Option, +} + +impl ExportStateTreeCommand { + pub async fn run(self) -> anyhow::Result<()> { + let Self { + chain, + db, + from, + to, + output, + } = self; + let output = output.unwrap_or_else(|| { + Path::new(&format!( + "statetree_{chain}_{to}_{from}{FOREST_CAR_FILE_EXTENSION}" + )) + .to_owned() + }); + let db_root_path = if let Some(db) = db { + db + } else { + let (_, config) = read_config(None, Some(chain.clone()))?; + db_root(&chain_path(&config))? + }; + let forest_car_db_dir = db_root_path.join(CAR_DB_DIR_NAME); + let db: Arc> = + Arc::new(ManyCar::new(open_db(db_root_path, &Default::default())?)); + load_all_forest_cars(&db, &forest_car_db_dir)?; + + let chain_config = Arc::new(ChainConfig::from_chain(&chain)); + let genesis_header = + read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db) + .await?; + let chain_store = Arc::new(ChainStore::new( + db.clone(), + db.clone(), + db.clone(), + chain_config, + genesis_header, + )?); + + let start_ts = chain_store.chain_index().tipset_by_height( + from, + chain_store.heaviest_tipset(), + ResolveNullTipset::TakeNewer, + )?; + + let mut ipld_roots = vec![]; + for (child, ts) in start_ts + .chain(&db) + .tuple_windows() + .take_while(|(_, parent)| parent.epoch() >= to) + { + ipld_roots.extend([*child.parent_state(), *child.parent_message_receipts()]); + ipld_roots.extend(ts.block_headers().iter().map(|h| h.messages)); + let receipts = Receipt::get_receipts(&db, *child.parent_message_receipts()) + .with_context(|| { + format!( + "failed to get receipts, root: {}, epoch: {}, tipset key: {}", + *child.parent_message_receipts(), + ts.epoch(), + ts.key(), + ) + })?; + ipld_roots.extend(receipts.into_iter().filter_map(|r| r.events_root())); + } + let roots = nunny::vec![ipld_roots.first().cloned().context("no ipld roots found")?]; + let stream = IpldStream::new(db, ipld_roots.clone()); + let frames = crate::db::car::forest::Encoder::compress_stream_default(stream); + let tmp = + tempfile::NamedTempFile::new_in(output.parent().unwrap_or_else(|| Path::new(".")))? + .into_temp_path(); + let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(&tmp).await?); + crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?; + writer.flush().await?; + tmp.persist(output)?; + + Ok(()) + } +} diff --git a/src/dev/subcommands/mod.rs b/src/dev/subcommands/mod.rs index c880f1c4445c..caf49d19477b 100644 --- a/src/dev/subcommands/mod.rs +++ b/src/dev/subcommands/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT mod archive_missing_cmd; +mod export_state_tree_cmd; mod export_tipset_lookup_cmd; mod state_cmd; mod update_checkpoints_cmd; @@ -51,6 +52,7 @@ pub enum Subcommand { /// Find missing archival snapshots on the Forest Archive for a given epoch range ArchiveMissing(archive_missing_cmd::ArchiveMissingCommand), ExportTipsetLookup(export_tipset_lookup_cmd::ExportTipsetLookupCommand), + ExportStateTree(export_state_tree_cmd::ExportStateTreeCommand), } impl Subcommand { @@ -61,6 +63,7 @@ impl Subcommand { Self::UpdateCheckpoints(cmd) => cmd.run().await, Self::ArchiveMissing(cmd) => cmd.run().await, Self::ExportTipsetLookup(cmd) => cmd.run().await, + Self::ExportStateTree(cmd) => cmd.run().await, } } } diff --git a/src/ipld/util.rs b/src/ipld/util.rs index f0b89e2f93b8..d77bcd2aea6f 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -422,6 +422,47 @@ impl, ITER: Iterator + Unpin> Stream } } +pin_project! { + pub struct IpldStream { + db: DB, + cid_vec: Vec, + seen: CidHashSet, + } +} + +impl IpldStream { + pub fn new(db: DB, roots: Vec) -> Self { + Self { + db, + cid_vec: roots, + seen: CidHashSet::default(), + } + } +} + +impl Stream for IpldStream { + type Item = anyhow::Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let this = self.project(); + while let Some(cid) = this.cid_vec.pop() { + if should_save_block_to_snapshot(cid) && this.seen.insert(cid) { + if let Some(data) = this.db.get(&cid)? { + if cid.codec() == fvm_ipld_encoding::DAG_CBOR { + let new_cids = extract_cids(&data)?; + this.cid_vec.extend(new_cids); + } + return Poll::Ready(Some(Ok(CarBlock { cid, data }))); + } else { + return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}")))); + } + } + } + // That's it, nothing else to do. End of stream. + Poll::Ready(None) + } +} + fn ipld_to_cid(ipld: Ipld) -> Option { if let Ipld::Link(cid) = ipld { Some(cid)