Skip to content

Commit

Permalink
The Header MMR (One MMR To Rule Them All) (mimblewimble#1716)
Browse files Browse the repository at this point in the history
* header MMR in use within txhashset itself
works with fast sync
not yet in place for initial header sync

* add the (currently unused) sync_head mmr

* use sync MMR during fast sync
rebuild header MMR after we validate full txhashset after download

* support missing header MMR (rebuild as necessary) for legacy nodes

* rename to HashOnly

* cleanup backend.append()

* simplify vec_backend to match simpler append api
  • Loading branch information
antiochp committed Oct 15, 2018
1 parent 5afca16 commit 05e6c15
Show file tree
Hide file tree
Showing 19 changed files with 936 additions and 195 deletions.
6 changes: 3 additions & 3 deletions api/src/types.rs
Expand Up @@ -97,9 +97,9 @@ impl TxHashSet {
pub fn from_head(head: Arc<chain::Chain>) -> TxHashSet {
let roots = head.get_txhashset_roots();
TxHashSet {
output_root_hash: roots.0.to_hex(),
range_proof_root_hash: roots.1.to_hex(),
kernel_root_hash: roots.2.to_hex(),
output_root_hash: roots.output_root.to_hex(),
range_proof_root_hash: roots.rproof_root.to_hex(),
kernel_root_hash: roots.kernel_root.to_hex(),
}
}
}
Expand Down
73 changes: 62 additions & 11 deletions chain/src/chain.rs
Expand Up @@ -35,7 +35,7 @@ use grin_store::Error::NotFoundErr;
use pipe;
use store;
use txhashset;
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashsetWriteStatus};
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus};
use util::secp::pedersen::{Commitment, RangeProof};
use util::LOGGER;

Expand Down Expand Up @@ -153,6 +153,7 @@ pub struct Chain {
// POW verification function
pow_verifier: fn(&BlockHeader, u8) -> Result<(), pow::Error>,
archive_mode: bool,
genesis: BlockHeader,
}

unsafe impl Sync for Chain {}
Expand All @@ -178,7 +179,7 @@ impl Chain {
// open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;

setup_head(genesis, store.clone(), &mut txhashset)?;
setup_head(genesis.clone(), store.clone(), &mut txhashset)?;

let head = store.head()?;
debug!(
Expand All @@ -199,6 +200,7 @@ impl Chain {
verifier_cache,
block_hashes_cache: Arc::new(RwLock::new(LruCache::new(HASHES_CACHE_SIZE))),
archive_mode,
genesis: genesis.header.clone(),
})
}

Expand Down Expand Up @@ -492,11 +494,15 @@ impl Chain {
Ok((extension.roots(), extension.sizes()))
})?;

// Carefully destructure these correctly...
// TODO - Maybe sizes should be a struct to add some type safety here...
let (_, output_mmr_size, _, kernel_mmr_size) = sizes;

b.header.output_root = roots.output_root;
b.header.range_proof_root = roots.rproof_root;
b.header.kernel_root = roots.kernel_root;
b.header.output_mmr_size = sizes.0;
b.header.kernel_mmr_size = sizes.2;
b.header.output_mmr_size = output_mmr_size;
b.header.kernel_mmr_size = kernel_mmr_size;
Ok(())
}

Expand Down Expand Up @@ -524,7 +530,7 @@ impl Chain {
}

/// Returns current txhashset roots
pub fn get_txhashset_roots(&self) -> (Hash, Hash, Hash) {
pub fn get_txhashset_roots(&self) -> TxHashSetRoots {
let mut txhashset = self.txhashset.write().unwrap();
txhashset.roots()
}
Expand Down Expand Up @@ -592,6 +598,40 @@ impl Chain {
Ok(())
}

/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
/// TODO - think about how to optimize this.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut txhashset = self.txhashset.write().unwrap();
let mut batch = self.store.batch()?;
txhashset::sync_extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}

/// Rebuild the header MMR based on current header_head.
/// We rebuild the header MMR after receiving a txhashset from a peer.
/// The txhashset contains output, rangeproof and kernel MMRs but we construct
/// the header MMR locally based on headers from our db.
/// TODO - think about how to optimize this.
fn rebuild_header_mmr(
&self,
head: &Tip,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = self.store.batch()?;
txhashset::header_extending(txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}

/// Writes a reading view on a txhashset state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
Expand Down Expand Up @@ -619,6 +659,10 @@ impl Chain {
let mut txhashset =
txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), Some(&header))?;

// The txhashset.zip contains the output, rangeproof and kernel MMRs.
// We must rebuild the header MMR ourselves based on the headers in our db.
self.rebuild_header_mmr(&Tip::from_block(&header), &mut txhashset)?;

// Validate the full kernel history (kernel MMR root for every block header).
self.validate_kernel_history(&header, &txhashset)?;

Expand Down Expand Up @@ -974,6 +1018,15 @@ fn setup_head(
// to match the provided block header.
let header = batch.get_block_header(&head.last_block_h)?;

// If we have no header MMR then rebuild as necessary.
// Supports old nodes with no header MMR.
txhashset::header_extending(txhashset, &mut batch, |extension| {
if extension.size() == 0 {
extension.rebuild(&head, &genesis.header)?;
}
Ok(())
})?;

let res = txhashset::extending(txhashset, &mut batch, |extension| {
extension.rewind(&header)?;
extension.validate_roots()?;
Expand Down Expand Up @@ -1033,17 +1086,15 @@ fn setup_head(
batch.save_head(&tip)?;
batch.setup_height(&genesis.header, &tip)?;

// Apply the genesis block to our empty MMRs.
txhashset::extending(txhashset, &mut batch, |extension| {
extension.apply_block(&genesis)?;

// Save the block_sums to the db for use later.
extension
.batch
.save_block_sums(&genesis.hash(), &BlockSums::default())?;

Ok(())
})?;

// Save the block_sums to the db for use later.
batch.save_block_sums(&genesis.hash(), &BlockSums::default())?;

info!(LOGGER, "chain: init: saved genesis: {:?}", genesis.hash());
}
Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()))?,
Expand Down
48 changes: 28 additions & 20 deletions chain/src/pipe.rs
Expand Up @@ -89,10 +89,19 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E

// Check if this block is already know due it being in the current set of orphan blocks.
check_known_orphans(&b.header, ctx)?;

// Check we have *this* block in the store.
// Stop if we have processed this block previously (it is in the store).
// This is more expensive than the earlier check_known() as we hit the store.
check_known_store(&b.header, ctx)?;
}

// Header specific processing.
handle_block_header(&b.header, ctx)?;
{
validate_header(&b.header, ctx)?;
add_block_header(&b.header, ctx)?;
update_header_head(&b.header, ctx)?;
}

// Check if are processing the "next" block relative to the current chain head.
let head = ctx.batch.head()?;
Expand All @@ -102,11 +111,6 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// * special case where this is the first fast sync full block
// Either way we can proceed (and we know the block is new and unprocessed).
} else {
// Check we have *this* block in the store.
// Stop if we have processed this block previously (it is in the store).
// This is more expensive than the earlier check_known() as we hit the store.
check_known_store(&b.header, ctx)?;

// At this point it looks like this is a new block that we have not yet processed.
// Check we have the *previous* block in the store.
// If we do not then treat this block as an orphan.
Expand All @@ -126,7 +130,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
if is_next_block(&b.header, &head) {
// No need to rewind if we are processing the next block.
} else {
// Rewind the re-apply blocks on the forked chain to
// Rewind and re-apply blocks on the forked chain to
// put the txhashset in the correct forked state
// (immediately prior to this new block).
rewind_and_apply_fork(b, extension)?;
Expand Down Expand Up @@ -172,12 +176,8 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// Add the newly accepted block and header to our index.
add_block(b, ctx)?;

// Update the chain head (and header_head) if total work is increased.
let res = {
let _ = update_header_head(&b.header, ctx)?;
let res = update_head(b, ctx)?;
res
};
// Update the chain head if total work is increased.
let res = update_head(b, ctx)?;
Ok(res)
}

Expand Down Expand Up @@ -207,8 +207,22 @@ pub fn sync_block_headers(

if !all_known {
for header in headers {
handle_block_header(header, ctx)?;
validate_header(header, ctx)?;
add_block_header(header, ctx)?;
}

let first_header = headers.first().unwrap();
let prev_header = ctx.batch.get_block_header(&first_header.previous)?;
txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
// Optimize this if "next" header
extension.rewind(&prev_header)?;

for header in headers {
extension.apply_header(header)?;
}

Ok(())
})?;
}

// Update header_head (if most work) and sync_head (regardless) in all cases,
Expand All @@ -229,12 +243,6 @@ pub fn sync_block_headers(
}
}

fn handle_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
validate_header(header, ctx)?;
add_block_header(header, ctx)?;
Ok(())
}

/// Process block header as part of "header first" block propagation.
/// We validate the header but we do not store it or update header head based
/// on this. We will update these once we get the block back after requesting
Expand Down

0 comments on commit 05e6c15

Please sign in to comment.