Skip to content

Commit

Permalink
ledger-tool: Get shreds from BigTable blocks (#1638)
Browse files Browse the repository at this point in the history
There is often a desire to examine/replay/etc older blocks. If the
blocks are recent enough, they can be pulled from an actively running
node. Otherwise, the blocks must be pulled down from warehouse node
archives. These archives are uploaded on a per-epoch basis so they are
quite large, and can take multiple hours to download and decompress.

With the addition of Entry data to BigTable, blocks can be recreated
from BigTable data. Namely, we can recreate the Entries with proper PoH
and transaction data. We can then shred them such that they are the
same format as blocks that are produced from the cluster.

This change introduces a new command that will read BigTable data and
insert shreds into a local Blockstore. The new command is:
  $ agave-ledger-tool bigtable shreds ...

Several important notes about the change:
- Shred for some slot S will not be signed by the actual leader for
  slot S. Instead, shreds will be signed with a "dummy" keypair. The
  shred signatures does not affect the ability to replay the block.
- Entry PoH data does not go back to genesis in BigTable. This data
  could be extracted and uploaded from the existing rocksdb archives;
  however, that work is not planned as far as I know. --allow-mock-poh
  can be passed to generate filler PoH data. Blocks created with this
  flag are replayable by passing --skip-poh-verify to ledger-tool.
- A snapshot will be unpacked to determine items such as the shred
  version, tick hash rate and ticks per slot. This snapshot must be in
  the same epoch as the requested slots
  • Loading branch information
steviez committed Jun 20, 2024
1 parent 8747c3f commit 3ee204b
Showing 1 changed file with 296 additions and 3 deletions.
299 changes: 296 additions & 3 deletions ledger-tool/src/bigtable.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
//! The `bigtable` subcommand
use {
crate::{
args::{load_genesis_arg, snapshot_args},
ledger_path::canonicalize_ledger_path,
load_and_process_ledger_or_exit, open_genesis_config_by,
output::{
encode_confirmed_block, CliBlockWithEntries, CliEntries,
EncodedConfirmedBlockWithEntries,
},
parse_process_options, LoadAndProcessLedgerOutput,
},
clap::{
value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand,
},
crossbeam_channel::unbounded,
futures::stream::FuturesUnordered,
log::{debug, error, info},
log::{debug, error, info, warn},
serde_json::json,
solana_clap_utils::{
input_parsers::pubkey_of,
Expand All @@ -22,11 +25,17 @@ use {
display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation,
OutputFormat,
},
solana_entry::entry::{create_ticks, Entry},
solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore,
bigtable_upload::ConfirmedBlockUploadConfig,
blockstore::Blockstore,
blockstore_options::AccessType,
shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_sdk::{
clock::Slot, hash::Hash, pubkey::Pubkey, shred_version::compute_shred_version,
signature::Signature, signer::keypair::keypair_from_seed,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
solana_storage_bigtable::CredentialType,
solana_transaction_status::{ConfirmedBlock, UiTransactionEncoding, VersionedConfirmedBlock},
std::{
Expand Down Expand Up @@ -164,6 +173,170 @@ async fn entries(
Ok(())
}

struct ShredConfig {
shred_version: u16,
num_hashes_per_tick: u64,
num_ticks_per_slot: u64,
allow_mock_poh: bool,
}

async fn shreds(
blockstore: Arc<Blockstore>,
starting_slot: Slot,
ending_slot: Slot,
shred_config: ShredConfig,
config: solana_storage_bigtable::LedgerStorageConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config)
.await
.map_err(|err| format!("Failed to connect to storage: {err:?}"))?;

// Make the range inclusive of both starting and ending slot
let limit = ending_slot.saturating_sub(starting_slot).saturating_add(1) as usize;
let mut slots = bigtable.get_confirmed_blocks(starting_slot, limit).await?;
slots.retain(|&slot| slot <= ending_slot);

// Create a "dummy" keypair to sign the shreds that will later be created.
//
// The validator shred ingestion path sigverifies shreds from the network
// using the known leader for any given slot. It is unlikely that a user of
// this tool will have access to these leader identity keypairs. However,
// shred sigverify occurs prior to Blockstore::insert_shreds(). Thus, the
// shreds being signed with the "dummy" keyapir can still be inserted and
// later read/replayed/etc
let keypair = keypair_from_seed(&[0; 64])?;
let ShredConfig {
shred_version,
num_hashes_per_tick,
num_ticks_per_slot,
allow_mock_poh,
} = shred_config;

for slot in slots.iter() {
let block = bigtable.get_confirmed_block(*slot).await?;
let entry_summaries = match bigtable.get_entries(*slot).await {
Ok(summaries) => Some(summaries),
Err(err) => {
let err_msg = format!("Failed to get PoH entries for {slot}: {err}");

if allow_mock_poh {
warn!("{err_msg}. Will create mock PoH entries instead.");
} else {
return Err(format!(
"{err_msg}. Try passing --allow-mock-poh to allow \
creation of shreds with mocked PoH entries"
))?;
}
None
}
};

let entries = match entry_summaries {
Some(entry_summaries) => entry_summaries
.enumerate()
.map(|(i, entry_summary)| {
let num_hashes = entry_summary.num_hashes;
let hash = entry_summary.hash;
let starting_transaction_index = entry_summary.starting_transaction_index;
let num_transactions = entry_summary.num_transactions as usize;

let Some(transactions) = block.transactions.get(
starting_transaction_index..starting_transaction_index + num_transactions,
) else {
let num_block_transactions = block.transactions.len();
return Err(format!(
"Entry summary {i} for slot {slot} with starting_transaction_index \
{starting_transaction_index} and num_transactions {num_transactions} \
is in conflict with the block, which has {num_block_transactions} \
transactions"
));
};
let transactions = transactions
.iter()
.map(|tx_with_meta| tx_with_meta.get_transaction())
.collect();

Ok(Entry {
num_hashes,
hash,
transactions,
})
})
.collect::<Result<Vec<Entry>, std::string::String>>()?,
None => {
let num_total_ticks = ((slot - block.parent_slot) * num_ticks_per_slot) as usize;
let num_total_entries = num_total_ticks + block.transactions.len();
let mut entries = Vec::with_capacity(num_total_entries);

// Create virtual tick entries for any skipped slots
//
// These ticks are necessary so that the tick height is
// advanced to the proper value when this block is processed.
//
// Additionally, a blockhash will still be inserted into the
// recent blockhashes sysvar for skipped slots. So, these
// virtual ticks will have the proper PoH
let num_skipped_slots = slot - block.parent_slot - 1;
if num_skipped_slots > 0 {
let num_virtual_ticks = num_skipped_slots * num_ticks_per_slot;
let parent_blockhash = Hash::from_str(&block.previous_blockhash)?;
let virtual_ticks_entries =
create_ticks(num_virtual_ticks, num_hashes_per_tick, parent_blockhash);
entries.extend(virtual_ticks_entries.into_iter());
}

// Create transaction entries
//
// Keep it simple and just do one transaction per Entry
let transaction_entries = block.transactions.iter().map(|tx_with_meta| Entry {
num_hashes: 0,
hash: Hash::default(),
transactions: vec![tx_with_meta.get_transaction()],
});
entries.extend(transaction_entries.into_iter());

// Create the tick entries for this slot
//
// We do not know the intermediate hashes, so just use default
// hash for all ticks. The exception is the final tick; the
// final tick determines the blockhash so set it the known
// blockhash from the bigtable block
let blockhash = Hash::from_str(&block.blockhash)?;
let tick_entries = (0..num_ticks_per_slot).map(|idx| {
let hash = if idx == num_ticks_per_slot - 1 {
blockhash
} else {
Hash::default()
};
Entry {
num_hashes: 0,
hash,
transactions: vec![],
}
});
entries.extend(tick_entries.into_iter());

entries
}
};

let shredder = Shredder::new(*slot, block.parent_slot, 0, shred_version)?;
let (data_shreds, _coding_shreds) = shredder.entries_to_shreds(
&keypair,
&entries,
true, // last_in_slot
None, // chained_merkle_root
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
blockstore.insert_shreds(data_shreds, None, false)?;
}
Ok(())
}

async fn blocks(
starting_slot: Slot,
limit: usize,
Expand Down Expand Up @@ -848,6 +1021,45 @@ impl BigTableSubCommand for App<'_, '_> {
.required(true),
),
)
.subcommand(
SubCommand::with_name("shreds")
.about(
"Get confirmed blocks from BigTable, reassemble the transactions \
and entries, shred the block and then insert the shredded blocks into \
the local Blockstore",
)
.arg(load_genesis_arg())
.args(&snapshot_args())
.arg(
Arg::with_name("starting_slot")
.long("starting-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.required(true)
.help("Start shred creation at this slot (inclusive)"),
)
.arg(
Arg::with_name("ending_slot")
.long("ending-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.required(true)
.help("Stop shred creation at this slot (inclusive)"),
)
.arg(
Arg::with_name("allow_mock_poh")
.long("allow-mock-poh")
.takes_value(false)
.help(
"For slots where PoH entries are unavailable, allow the \
generation of mock PoH entries. The mock PoH entries enable \
the shredded block(s) to be replayable if PoH verification is \
disabled.",
),
),
)
.subcommand(
SubCommand::with_name("confirm")
.about("Confirm transaction by signature")
Expand Down Expand Up @@ -1142,6 +1354,87 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
};
runtime.block_on(entries(slot, output_format, config))
}
("shreds", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot);
if starting_slot > ending_slot {
eprintln!(
"The specified --starting-slot {starting_slot} must be less than or equal to \
the specified --ending-slot {ending_slot}."
);
exit(1);
}
let allow_mock_poh = arg_matches.is_present("allow_mock_poh");

let ledger_path = canonicalize_ledger_path(ledger_path);
let process_options = parse_process_options(&ledger_path, arg_matches);
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
let blockstore = Arc::new(crate::open_blockstore(
&ledger_path,
arg_matches,
AccessType::Primary,
));
let LoadAndProcessLedgerOutput { bank_forks, .. } = load_and_process_ledger_or_exit(
arg_matches,
&genesis_config,
blockstore.clone(),
process_options,
None,
);

let bank = bank_forks.read().unwrap().working_bank();
// If mock PoH is allowed, ensure that the requested slots are in
// the same epoch as the working bank. This will ensure the values
// extracted from the Bank are accurate for the slot range
if allow_mock_poh {
let working_bank_epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
let starting_epoch = epoch_schedule.get_epoch(starting_slot);
let ending_epoch = epoch_schedule.get_epoch(ending_slot);
if starting_epoch != ending_epoch {
eprintln!(
"The specified --starting-slot and --ending-slot must be in the\
same epoch. --starting-slot {starting_slot} is in epoch {starting_epoch},\
but --ending-slot {ending_slot} is in epoch {ending_epoch}."
);
exit(1);
}
if starting_epoch != working_bank_epoch {
eprintln!(
"The range of slots between --starting-slot and --ending-slot are in a \
different epoch than the working bank. The specified range is in epoch \
{starting_epoch}, but the working bank is in {working_bank_epoch}."
);
exit(1);
}
}

let shred_version =
compute_shred_version(&genesis_config.hash(), Some(&bank.hard_forks()));
let num_hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
let num_ticks_per_slot = bank.ticks_per_slot();
let shred_config = ShredConfig {
shred_version,
num_hashes_per_tick,
num_ticks_per_slot,
allow_mock_poh,
};

let config = solana_storage_bigtable::LedgerStorageConfig {
read_only: true,
instance_name,
app_profile_id,
..solana_storage_bigtable::LedgerStorageConfig::default()
};

runtime.block_on(shreds(
blockstore,
starting_slot,
ending_slot,
shred_config,
config,
))
}
("blocks", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let limit = value_t_or_exit!(arg_matches, "limit", usize);
Expand Down

0 comments on commit 3ee204b

Please sign in to comment.