Skip to content

Commit

Permalink
Merge branch 'main' into sled-trees
Browse files Browse the repository at this point in the history
  • Loading branch information
yaahc committed Oct 26, 2020
2 parents 8217e15 + 971765a commit 7e4a132
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 38 deletions.
47 changes: 25 additions & 22 deletions book/src/dev/rfcs/0005-state-updates.md
Expand Up @@ -526,32 +526,35 @@ The state service uses the following entry points:

New `non-finalized` blocks are commited as follows:

#### `pub(super) fn queue_and_commit_non_finalized_blocks(&mut self, new: Arc<Block>) -> tokio::sync::broadcast::Receiver<block::Hash>`
### `pub(super) fn queue_and_commit_non_finalized_blocks(&mut self, new: Arc<Block>) -> tokio::sync::oneshot::Receiver<block::Hash>`

1. If a duplicate block exists in the queue:
- Find the `QueuedBlock` for that existing duplicate block
- Create an extra receiver for the existing block, using `block.rsp_tx.subscribe`,
- Drop the newly received duplicate block
- Return the extra receiver, so it can be used in the response future for the duplicate block request

2. Create a `QueuedBlock` for `block`:
- Create a `tokio::sync::broadcast` channel
- Use that channel to create a `QueuedBlock` for `block`.

3. If a duplicate block exists in a non-finalized chain, or the finalized chain,
1. If a duplicate block hash exists in a non-finalized chain, or the finalized chain,
it has already been successfully verified:
- Broadcast `Ok(block.hash())` via `block.rsp_tx`, and return the receiver for the block's channel
- create a new oneshot channel
- immediately send `Err(DuplicateBlockHash)` drop the sender
- return the reciever

4. Add `block` to `self.queued_blocks`

5. If `block.header.previous_block_hash` is not present in the finalized or
2. If a duplicate block hash exists in the queue:
- Find the `QueuedBlock` for that existing duplicate block
- create a new channel for the new request
- replace the old sender in `queued_block` with the new sender
- send `Err(DuplicateBlockHash)` through the old sender channel
- continue to use the new receiver

3. Else create a `QueuedBlock` for `block`:
- Create a `tokio::sync::oneshot` channel
- Use that channel to create a `QueuedBlock` for `block`
- Add `block` to `self.queued_blocks`
- continue to use the new receiver

4. If `block.header.previous_block_hash` is not present in the finalized or
non-finalized state:
- Return the receiver for the block's channel

6. Else iteratively attempt to process queued blocks by their parent hash
5. Else iteratively attempt to process queued blocks by their parent hash
starting with `block.header.previous_block_hash`

7. While there are recently commited parent hashes to process
6. While there are recently commited parent hashes to process
- Dequeue all blocks waiting on `parent` with `let queued_children =
self.queued_blocks.dequeue_children(parent);`
- for each queued `block`
Expand All @@ -569,17 +572,17 @@ New `non-finalized` blocks are commited as follows:
- Add `block.hash` to the set of recently commited parent hashes to
process

8. While the length of the non-finalized portion of the best chain is greater
7. While the length of the non-finalized portion of the best chain is greater
than the reorg limit
- Remove the lowest height block from the non-finalized state with
`self.mem.finalize();`
- Commit that block to the finalized state with
`self.sled.commit_finalized_direct(finalized);`

9. Prune orphaned blocks from `self.queued_blocks` with
8. Prune orphaned blocks from `self.queued_blocks` with
`self.queued_blocks.prune_by_height(finalized_height);`
10. Return the receiver for the block's channel

9. Return the receiver for the block's channel

## Sled data structures
[sled]: #sled
Expand Down
64 changes: 48 additions & 16 deletions zebra-state/src/service.rs
Expand Up @@ -9,7 +9,7 @@ use std::{

use futures::future::FutureExt;
use memory_state::{NonFinalizedState, QueuedBlocks};
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tower::{util::BoxService, Service};
use tracing::instrument;
use zebra_chain::{
Expand All @@ -18,8 +18,7 @@ use zebra_chain::{
};

use crate::{
BoxError, CloneError, CommitBlockError, Config, FinalizedState, Request, Response,
ValidateContextError,
BoxError, CommitBlockError, Config, FinalizedState, Request, Response, ValidateContextError,
};

mod memory_state;
Expand All @@ -32,7 +31,7 @@ pub struct QueuedBlock {
// TODO: add these parameters when we can compute anchors.
// sprout_anchor: sprout::tree::Root,
// sapling_anchor: sapling::tree::Root,
pub rsp_tx: broadcast::Sender<Result<block::Hash, CloneError>>,
pub rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
}

struct StateService {
Expand Down Expand Up @@ -73,14 +72,38 @@ impl StateService {
/// in RFC0005.
///
/// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
#[instrument(skip(self, new))]
fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock) {
let parent_hash = new.block.header.previous_block_hash;
#[instrument(skip(self, block))]
fn queue_and_commit_non_finalized_blocks(
&mut self,
block: Arc<Block>,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
let hash = block.hash();
let parent_hash = block.header.previous_block_hash;

self.queued_blocks.queue(new);
if self.contains_committed_block(&block) {
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("duplicate block".into()));
return rsp_rx;
}

// The queue of blocks maintained by this service acts as a pipeline for
// blocks waiting for contextual verification. We lazily flush the
// pipeline here by handling duplicate requests to verify an existing
// queued block. We handle those duplicate requests by replacing the old
// channel with the new one and sending an error over the old channel.
let rsp_rx = if let Some(queued_block) = self.queued_blocks.get_mut(&hash) {
let (mut rsp_tx, rsp_rx) = oneshot::channel();
std::mem::swap(&mut queued_block.rsp_tx, &mut rsp_tx);
let _ = rsp_tx.send(Err("duplicate block".into()));
rsp_rx
} else {
let (rsp_tx, rsp_rx) = oneshot::channel();
self.queued_blocks.queue(QueuedBlock { block, rsp_tx });
rsp_rx
};

if !self.can_fork_chain_at(&parent_hash) {
return;
return rsp_rx;
}

self.process_queued(parent_hash);
Expand All @@ -96,6 +119,8 @@ impl StateService {
.prune_by_height(self.sled.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
));

rsp_rx
}

/// Run contextual validation on `block` and add it to the non-finalized
Expand All @@ -118,6 +143,17 @@ impl StateService {
self.mem.any_chain_contains(hash) || &self.sled.finalized_tip_hash() == hash
}

/// Returns true if the given hash has been committed to either the finalized
/// or non-finalized state.
fn contains_committed_block(&self, block: &Block) -> bool {
let hash = block.hash();
let height = block
.coinbase_height()
.expect("coinbase heights should be valid");

self.mem.any_chain_contains(&hash) || self.sled.get_hash(height) == Some(hash)
}

/// Attempt to validate and commit all queued blocks whose parents have
/// recently arrived starting from `new_parent`, in breadth-first ordering.
#[instrument(skip(self))]
Expand All @@ -132,7 +168,7 @@ impl StateService {
let result = self
.validate_and_commit(block)
.map(|()| hash)
.map_err(CloneError::from);
.map_err(BoxError::from);
let _ = rsp_tx.send(result);
new_parents.push(hash);
}
Expand Down Expand Up @@ -179,14 +215,11 @@ impl Service<Request> for StateService {
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CommitBlock { block } => {
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);

self.pending_utxos.check_block(&block);
self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx });
let rsp_rx = self.queue_and_commit_non_finalized_blocks(block);

async move {
rsp_rx
.recv()
.await
.expect("sender is not dropped")
.map(Response::Committed)
Expand All @@ -195,15 +228,14 @@ impl Service<Request> for StateService {
.boxed()
}
Request::CommitFinalizedBlock { block } => {
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
let (rsp_tx, rsp_rx) = oneshot::channel();

self.pending_utxos.check_block(&block);
self.sled
.queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx });

async move {
rsp_rx
.recv()
.await
.expect("sender is not dropped")
.map(Response::Committed)
Expand Down
5 changes: 5 additions & 0 deletions zebra-state/src/service/memory_state.rs
Expand Up @@ -641,6 +641,11 @@ impl QueuedBlocks {
.remove(&hash);
}
}

/// Return the queued block if it has already been registered
pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedBlock> {
self.blocks.get_mut(&hash)
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions zebra-state/src/sled_state.rs
Expand Up @@ -217,4 +217,12 @@ impl FinalizedState {
) -> Result<Option<transparent::Output>, BoxError> {
self.utxo_by_outpoint.zs_get(outpoint)
}

/// Returns the finalized hash for a given `block::Height` if it is present.
pub fn get_hash(&self, height: block::Height) -> Option<block::Hash> {
self.hash_by_height
.get(&height.0.to_be_bytes())
.expect("expected that sled errors would not occur")
.map(|bytes| block::Hash(bytes.as_ref().try_into().unwrap()))
}
}

0 comments on commit 7e4a132

Please sign in to comment.