Skip to content

Commit

Permalink
Refactor on_chunk_response
Browse files Browse the repository at this point in the history
  • Loading branch information
remagpie committed Dec 17, 2019
1 parent f192340 commit 8a428cb
Showing 1 changed file with 46 additions and 45 deletions.
91 changes: 46 additions & 45 deletions sync/src/block/extension.rs
Expand Up @@ -999,59 +999,60 @@ impl Extension {
}

fn on_chunk_response(&mut self, from: &NodeId, roots: &[H256], chunks: &[Vec<u8>]) {
if let State::SnapshotChunk {
block,
ref mut restore,
} = self.state
{
for (r, c) in roots.iter().zip(chunks) {
if c.is_empty() {
cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r);
let (block, restore) = match self.state {
State::SnapshotChunk {
block,
ref mut restore,
} => (block, restore),
_ => return,
};
for (r, c) in roots.iter().zip(chunks) {
if c.is_empty() {
cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r);
continue
}
let decompressor = ChunkDecompressor::from_slice(c);
let raw_chunk = match decompressor.decompress() {
Ok(chunk) => chunk,
Err(e) => {
cwarn!(SYNC, "Decode failed for chunk response from peer {}: {}", from, e);
continue
}
let decompressor = ChunkDecompressor::from_slice(c);
let raw_chunk = match decompressor.decompress() {
Ok(chunk) => chunk,
Err(e) => {
cwarn!(SYNC, "Decode failed for chunk response from peer {}: {}", from, e);
continue
}
};
let recovered = match raw_chunk.recover(*r) {
Ok(chunk) => chunk,
};
let recovered = match raw_chunk.recover(*r) {
Ok(chunk) => chunk,
Err(e) => {
cwarn!(SYNC, "Invalid chunk response from peer {}: {}", from, e);
continue
}
};

let batch = {
let mut state_db = self.client.state_db().write();
let hash_db = state_db.as_hashdb_mut();
restore.feed(hash_db, recovered);

let mut batch = DBTransaction::new();
match state_db.journal_under(&mut batch, 0, H256::zero()) {
Ok(_) => batch,
Err(e) => {
cwarn!(SYNC, "Invalid chunk response from peer {}: {}", from, e);
cwarn!(SYNC, "Failed to write state chunk to database: {}", e);
continue
}
};

let batch = {
let mut state_db = self.client.state_db().write();
let hash_db = state_db.as_hashdb_mut();
restore.feed(hash_db, recovered);

let mut batch = DBTransaction::new();
match state_db.journal_under(&mut batch, 0, H256::zero()) {
Ok(_) => batch,
Err(e) => {
cwarn!(SYNC, "Failed to write state chunk to database: {}", e);
continue
}
}
};
self.client.db().write_buffered(batch);
match self.client.db().flush() {
Ok(_) => cdebug!(SYNC, "Wrote state chunk to database: {}", r),
Err(e) => cwarn!(SYNC, "Failed to flush database: {}", e),
}
};
self.client.db().write_buffered(batch);
match self.client.db().flush() {
Ok(_) => cdebug!(SYNC, "Wrote state chunk to database: {}", r),
Err(e) => cwarn!(SYNC, "Failed to flush database: {}", e),
}
}

if let Some(root) = restore.next_to_feed() {
self.send_chunk_request(&block, &root);
} else {
self.client.force_update_best_block(&block);
self.transition_to_full();
}
if let Some(root) = restore.next_to_feed() {
self.send_chunk_request(&block, &root);
} else {
self.client.force_update_best_block(&block);
self.transition_to_full();
}
}

Expand Down

0 comments on commit 8a428cb

Please sign in to comment.