Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/docs/users/reference/cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,11 @@ generate_markdown_section "forest-dev" "state compute"
generate_markdown_section "forest-dev" "state replay-compute"
generate_markdown_section "forest-dev" "state validate"
generate_markdown_section "forest-dev" "state replay-validate"

generate_markdown_section "forest-dev" "update-checkpoints"

generate_markdown_section "forest-dev" "archive-missing"

generate_markdown_section "forest-dev" "export-tipset-lookup"

generate_markdown_section "forest-dev" "export-state-tree"
124 changes: 124 additions & 0 deletions src/dev/subcommands/export_state_tree_cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::{
chain::{ChainStore, index::ResolveNullTipset},
cli_shared::{chain_path, read_config},
daemon::db_util::load_all_forest_cars,
db::{
CAR_DB_DIR_NAME,
car::ManyCar,
car::forest::FOREST_CAR_FILE_EXTENSION,
db_engine::{db_root, open_db},
},
genesis::read_genesis_header,
ipld::IpldStream,
networks::{ChainConfig, NetworkChain},
shim::{clock::ChainEpoch, executor::Receipt},
};
use anyhow::Context as _;
use clap::Args;
use itertools::Itertools;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io::AsyncWriteExt as _;

/// Exports N consecutive parent state trees(together with messages, message receipts and events) of the tipset at the given epoch
#[derive(Debug, Args)]
pub struct ExportStateTreeCommand {
/// Filecoin network chain (e.g., calibnet, mainnet)
#[arg(long, required = true)]
chain: NetworkChain,
/// Optional path to the database folder
#[arg(long)]
db: Option<PathBuf>,
/// The maximum tipset epoch to export state tree from (Exclusive)
#[arg(long)]
from: ChainEpoch,
/// The minimum tipset epoch to export state tree from (Inclusive)
#[arg(long)]
to: ChainEpoch,
/// The path to the output `ForestCAR` file
#[arg(short, long)]
output: Option<PathBuf>,
Comment thread
hanabi1224 marked this conversation as resolved.
}

impl ExportStateTreeCommand {
pub async fn run(self) -> anyhow::Result<()> {
let Self {
chain,
db,
from,
to,
output,
} = self;
let output = output.unwrap_or_else(|| {
Path::new(&format!(
"statetree_{chain}_{to}_{from}{FOREST_CAR_FILE_EXTENSION}"
))
.to_owned()
});
let db_root_path = if let Some(db) = db {
db
} else {
let (_, config) = read_config(None, Some(chain.clone()))?;
db_root(&chain_path(&config))?
};
let forest_car_db_dir = db_root_path.join(CAR_DB_DIR_NAME);
let db: Arc<ManyCar<crate::db::parity_db::ParityDb>> =
Arc::new(ManyCar::new(open_db(db_root_path, &Default::default())?));
load_all_forest_cars(&db, &forest_car_db_dir)?;

let chain_config = Arc::new(ChainConfig::from_chain(&chain));
let genesis_header =
read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db)
.await?;
let chain_store = Arc::new(ChainStore::new(
db.clone(),
db.clone(),
db.clone(),
chain_config,
genesis_header,
)?);

let start_ts = chain_store.chain_index().tipset_by_height(
from,
chain_store.heaviest_tipset(),
ResolveNullTipset::TakeNewer,
)?;

let mut ipld_roots = vec![];
for (child, ts) in start_ts
.chain(&db)
.tuple_windows()
.take_while(|(_, parent)| parent.epoch() >= to)
{
ipld_roots.extend([*child.parent_state(), *child.parent_message_receipts()]);
ipld_roots.extend(ts.block_headers().iter().map(|h| h.messages));
let receipts = Receipt::get_receipts(&db, *child.parent_message_receipts())
.with_context(|| {
format!(
"failed to get receipts, root: {}, epoch: {}, tipset key: {}",
*child.parent_message_receipts(),
ts.epoch(),
ts.key(),
)
})?;
ipld_roots.extend(receipts.into_iter().filter_map(|r| r.events_root()));
}
let roots = nunny::vec![ipld_roots.first().cloned().context("no ipld roots found")?];
let stream = IpldStream::new(db, ipld_roots.clone());
let frames = crate::db::car::forest::Encoder::compress_stream_default(stream);
let tmp =
tempfile::NamedTempFile::new_in(output.parent().unwrap_or_else(|| Path::new(".")))?
.into_temp_path();
let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(&tmp).await?);
crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
writer.flush().await?;
tmp.persist(output)?;

Ok(())
}
}
3 changes: 3 additions & 0 deletions src/dev/subcommands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

mod archive_missing_cmd;
mod export_state_tree_cmd;
mod export_tipset_lookup_cmd;
mod state_cmd;
mod update_checkpoints_cmd;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub enum Subcommand {
/// Find missing archival snapshots on the Forest Archive for a given epoch range
ArchiveMissing(archive_missing_cmd::ArchiveMissingCommand),
ExportTipsetLookup(export_tipset_lookup_cmd::ExportTipsetLookupCommand),
ExportStateTree(export_state_tree_cmd::ExportStateTreeCommand),
}

impl Subcommand {
Expand All @@ -61,6 +63,7 @@ impl Subcommand {
Self::UpdateCheckpoints(cmd) => cmd.run().await,
Self::ArchiveMissing(cmd) => cmd.run().await,
Self::ExportTipsetLookup(cmd) => cmd.run().await,
Self::ExportStateTree(cmd) => cmd.run().await,
}
}
}
Expand Down
41 changes: 41 additions & 0 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,47 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
}
}

pin_project! {
pub struct IpldStream<DB> {
db: DB,
cid_vec: Vec<Cid>,
seen: CidHashSet,
}
}

impl<DB> IpldStream<DB> {
pub fn new(db: DB, roots: Vec<Cid>) -> Self {
Self {
db,
cid_vec: roots,
seen: CidHashSet::default(),
}
}
}

impl<DB: Blockstore> Stream for IpldStream<DB> {
type Item = anyhow::Result<CarBlock>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
while let Some(cid) = this.cid_vec.pop() {
if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
if let Some(data) = this.db.get(&cid)? {
if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let new_cids = extract_cids(&data)?;
this.cid_vec.extend(new_cids);
}
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
} else {
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
}
}
}
// That's it, nothing else to do. End of stream.
Poll::Ready(None)
}
}

fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
if let Ipld::Link(cid) = ipld {
Some(cid)
Expand Down
Loading