Skip to content

Commit

Permalink
fix: skip db consolidation if no new dataset was downloaded (#513)
Browse files Browse the repository at this point in the history
### Description

The database consolidation loop doesn't need to happen if there is no
new data.

@lgalabru I'm interested in your thoughts on this. Potentially someone
could break their stacks.rocksdb, then Chainhook wouldn't automatically
fix it unless they deleted their archive to force a redownload, or a new
archive becomes available.

I think the tradeoff is worth it because of the improvements to startup
time if users are repeatedly running `chainhook predicates scan`, or if
the service goes down and needs to restart.

---------

Co-authored-by: Chris Guimarães <cguimaraes.br@gmail.com>
  • Loading branch information
MicaiahReid and csgui committed Mar 19, 2024
1 parent 32cdfee commit b1469a6
Showing 1 changed file with 54 additions and 47 deletions.
101 changes: 54 additions & 47 deletions components/chainhook-cli/src/scan/stacks.rs
Expand Up @@ -516,59 +516,66 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
"Building local chainstate from Stacks archive file"
);

let _ = download_stacks_dataset_if_required(config, ctx).await;

let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx);
let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Begining import of {} Stacks blocks into rocks db", blocks_to_insert
);
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
blocks_read += 1;

// If blocks already stored, move on
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
continue;
}
blocks_inserted += 1;
let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await;

if downloaded_new_dataset {
let stacks_db =
open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx);
let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Beginning import of {} Stacks blocks into rocks db", blocks_to_insert
);
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
blocks_read += 1;

let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
&mut indexer.stacks_context,
ctx,
) {
Ok(block) => block,
Err(e) => {
error!(&ctx.expect_logger(), "{e}");
// If blocks already stored, move on
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
continue;
}
};
blocks_inserted += 1;

// TODO: return a result
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx);
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
&mut indexer.stacks_context,
ctx,
) {
Ok(block) => block,
Err(e) => {
error!(&ctx.expect_logger(), "{e}");
continue;
}
};

if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
);
let _ = stacks_db_rw.flush();
// TODO: return a result
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx);

if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
);
let _ = stacks_db_rw.flush();
}
}
let _ = stacks_db_rw.flush();
info!(
ctx.expect_logger(),
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
);
} else {
info!(
ctx.expect_logger(),
"Skipping database consolidation - no new archive found since last consolidation."
);
}
let _ = stacks_db_rw.flush();
info!(
ctx.expect_logger(),
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
);

Ok(())
}

0 comments on commit b1469a6

Please sign in to comment.