Skip to content

Commit

Permalink
Import shard trie chunks in sync extension
Browse files Browse the repository at this point in the history
  • Loading branch information
remagpie committed Dec 19, 2019
1 parent f628350 commit 364fa18
Showing 1 changed file with 100 additions and 22 deletions.
122 changes: 100 additions & 22 deletions sync/src/block/extension.rs
Expand Up @@ -24,17 +24,17 @@ use std::time::Duration;
use ccore::encoded::Header as EncodedHeader;
use ccore::{
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock,
ImportError, UnverifiedTransaction,
ImportError, StateInfo, UnverifiedTransaction,
};
use cmerkle::snapshot::ChunkDecompressor;
use cmerkle::snapshot::Restore as SnapshotRestore;
use cmerkle::{skewed_merkle_root, TrieFactory};
use cnetwork::{Api, EventSender, NetworkExtension, NodeId};
use cstate::FindActionHandler;
use cstate::{FindActionHandler, TopLevelState, TopStateView};
use ctimer::TimerToken;
use ctypes::header::{Header, Seal};
use ctypes::transaction::Action;
use ctypes::{BlockHash, BlockNumber};
use ctypes::{BlockHash, BlockNumber, ShardId};
use hashdb::AsHashDB;
use kvdb::DBTransaction;
use primitives::{H256, U256};
Expand Down Expand Up @@ -68,10 +68,15 @@ enum State {
header: EncodedHeader,
prev_root: H256,
},
SnapshotChunk {
SnapshotTopChunk {
block: BlockHash,
restore: SnapshotRestore,
},
SnapshotShardChunk {
block: BlockHash,
shard_id: ShardId,
restore: SnapshotRestore,
},
Full,
}

Expand All @@ -82,7 +87,7 @@ impl State {
None => return State::Full,
};
let header = match client.block_header(&num.into()) {
Some(ref h) if h.hash() == hash => h.clone(),
Some(h) if h.hash() == hash => h,
_ => return State::SnapshotHeader(hash, num),
};
if client.block_body(&hash.into()).is_none() {
Expand All @@ -97,13 +102,35 @@ impl State {

let state_db = client.state_db().read();
let state_root = header.state_root();
match TrieFactory::readonly(state_db.as_hashdb(), &state_root) {
Ok(ref trie) if trie.is_complete() => State::Full,
_ => State::SnapshotChunk {
let top_trie = TrieFactory::readonly(state_db.as_hashdb(), &state_root);
if !top_trie.map(|t| t.is_complete()).unwrap_or(false) {
return State::SnapshotTopChunk {
block: hash,
restore: SnapshotRestore::new(state_root),
},
}
}

let top_state = client.state_at(hash.into()).expect("Top level state at the snapshot header exists");
let metadata = top_state.metadata().unwrap().expect("Metadata must exist for the snapshot block");
let shard_num = *metadata.number_of_shards();
let empty_shard = (0..shard_num).find_map(|n| {
let shard_root = top_state.shard_root(n).unwrap().expect("Shard root must exist");
let trie = TrieFactory::readonly(state_db.as_hashdb(), &shard_root);
if !trie.map(|t| t.is_complete()).unwrap_or(false) {
Some((n, shard_root))
} else {
None
}
});
if let Some((shard_id, shard_root)) = empty_shard {
return State::SnapshotShardChunk {
block: hash,
shard_id,
restore: SnapshotRestore::new(shard_root),
}
}

State::Full
}

fn next(&self, client: &Client) -> Self {
Expand All @@ -121,13 +148,48 @@ impl State {
State::SnapshotBody {
header,
..
} => State::SnapshotChunk {
} => State::SnapshotTopChunk {
block: header.hash(),
restore: SnapshotRestore::new(header.state_root()),
},
State::SnapshotChunk {
State::SnapshotTopChunk {
block,
..
} => State::Full,
} => {
let header = client.block_header(&(*block).into()).expect("Snapshot header must exist");
let state_root = header.state_root();
let state_db = client.state_db().read();
let top_state = TopLevelState::from_existing(state_db.clone(&state_root), state_root).unwrap();
let shard_root = top_state.shard_root(0).unwrap().expect("Shard 0 always exists");
State::SnapshotShardChunk {
block: *block,
shard_id: 0,
restore: SnapshotRestore::new(shard_root),
}
}
State::SnapshotShardChunk {
block,
shard_id,
..
} => {
let top_state = client.state_at((*block).into()).expect("State at the snapshot header must exist");
let metadata = top_state.metadata().unwrap().expect("Metadata must exist for snapshot block");
let shard_num = *metadata.number_of_shards();
if shard_id + 1 == shard_num {
State::Full
} else {
let next_shard = shard_id + 1;
let shard_root = top_state
.shard_root(next_shard)
.expect("Top level state must be valid")
.expect("Shard root must exist");
State::SnapshotShardChunk {
block: *block,
shard_id: next_shard,
restore: SnapshotRestore::new(shard_root),
}
}
}
State::Full => State::Full,
}
}
Expand Down Expand Up @@ -206,11 +268,15 @@ impl Extension {
header,
..
} => header.hash(),
State::SnapshotChunk {
State::SnapshotTopChunk {
block,
..
} => *block,
State::SnapshotShardChunk {
block,
..
} => *block,
State::Full => unreachable!("Trying to transition state from State::Full"),
State::Full => panic!("Trying to transit the state from State::Full"),
};
self.client.force_update_best_block(&best_hash);
for downloader in self.header_downloaders.values_mut() {
Expand Down Expand Up @@ -522,9 +588,20 @@ impl NetworkExtension<Event> for Extension {
}
}
}
State::SnapshotChunk {
State::SnapshotTopChunk {
block,
ref mut restore,
} => {
if let Some(root) = restore.next_to_feed() {
self.send_chunk_request(&block, &root);
} else {
self.move_state();
}
}
State::SnapshotShardChunk {
block,
ref mut restore,
..
} => {
if let Some(root) = restore.next_to_feed() {
self.send_chunk_request(&block, &root);
Expand Down Expand Up @@ -925,12 +1002,6 @@ impl Extension {
headers.len()
),
},
State::SnapshotBody {
..
} => {}
State::SnapshotChunk {
..
} => {}
State::Full => {
let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) {
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
Expand Down Expand Up @@ -969,6 +1040,7 @@ impl Extension {
}
}
}
_ => {}
}
}

Expand Down Expand Up @@ -1044,12 +1116,18 @@ impl Extension {

fn on_chunk_response(&mut self, from: &NodeId, roots: &[H256], chunks: &[Vec<u8>]) {
let (block, restore) = match self.state {
State::SnapshotChunk {
State::SnapshotTopChunk {
block,
ref mut restore,
} => (block, restore),
State::SnapshotShardChunk {
block,
ref mut restore,
..
} => (block, restore),
_ => return,
};
assert_eq!(roots.len(), chunks.len());
for (r, c) in roots.iter().zip(chunks) {
if c.is_empty() {
cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r);
Expand Down

0 comments on commit 364fa18

Please sign in to comment.