Skip to content

Commit

Permalink
Implement full piece validation semantics
Browse files Browse the repository at this point in the history
Synapse now hashes pieces as it downloads them. The piece field of
the Torrent struct now reflects validated pieces rather than "downloaded"
ones.
  • Loading branch information
Luminarys committed Mar 12, 2018
1 parent 87b7d47 commit f92d9f3
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 78 deletions.
30 changes: 16 additions & 14 deletions build.rs
@@ -1,28 +1,30 @@
extern crate cc;

use std::env;

fn main() {
if cfg!(target_os = "linux") {
cc::Build::new()
.file("native/fallocate_linux.c")
.opt_level(3)
.compile("fallocate");
let debug = env::var("DEBUG").unwrap() != "false";

let fallocate_path = if cfg!(target_os = "linux") {
"native/fallocate_linux.c"
} else if cfg!(target_os = "macos") {
cc::Build::new()
.file("native/fallocate_darwin.c")
.opt_level(3)
.compile("fallocate");
"native/fallocate_darwin.c"
} else if cfg!(target_family = "unix") {
cc::Build::new()
.file("native/fallocate_posix.c")
.opt_level(3)
.compile("fallocate");
"native/fallocate_posix.c"
} else {
panic!("synapse can only be compiled on a POSIX platform!");
}
};

cc::Build::new()
.file(fallocate_path)
.opt_level(3)
.debug(debug)
.compile("fallocate");

cc::Build::new()
.file("native/mmap.c")
.opt_level(3)
.debug(debug)
.warnings(false)
.compile("mmap");
}
1 change: 0 additions & 1 deletion src/disk/cache.rs
Expand Up @@ -164,7 +164,6 @@ impl FileCache {
{
let stat = file.metadata()?;
let sparse = stat.blocks() * stat.blksize() < stat.size();

let mmap = unsafe { MmapMut::map_mut(&file)? };
self.files.insert(
path.to_path_buf(),
Expand Down
90 changes: 39 additions & 51 deletions src/torrent/mod.rs
Expand Up @@ -45,6 +45,7 @@ pub enum TrackerStatus {
pub struct Torrent<T: cio::CIO> {
id: usize,
pieces: Bitfield,
validating: FHashSet<u32>,
info: Arc<Info>,
cio: T,
uploaded: u64,
Expand Down Expand Up @@ -253,6 +254,7 @@ impl<T: cio::CIO> Torrent<T> {
path,
peers,
pieces,
validating: FHashSet::default(),
picker,
priority: 3,
priorities,
Expand Down Expand Up @@ -365,6 +367,7 @@ impl<T: cio::CIO> Torrent<T> {
info,
peers,
pieces: d.pieces,
validating: FHashSet::default(),
picker,
uploaded: d.uploaded,
downloaded: d.downloaded,
Expand Down Expand Up @@ -683,23 +686,27 @@ impl<T: cio::CIO> Torrent<T> {
]));
}
disk::Response::PieceValidated { piece, valid, .. } => {
// We use a transient, on the fly validation approach for simplicity.
self.validating.remove(&piece);
if valid {
self.pieces.set_bit(piece as u64);
// Tell all relevant peers we got the piece
let m = Message::Have(piece);
for pid in &self.leechers {
if let Some(peer) = self.peers.get_mut(pid) {
if !peer.pieces().has_bit(u64::from(piece)) {
peer.send_message(m.clone());
}
} else {
// This situation can occur when a torrent itself is a leecher
// and the piece download causes a "self notification", while it
// has been removed. Ignore for now.
}
}
self.files.update(&self.info, piece);
self.check_complete();
} else {
info!("Invalid piece downloaded!");
// TODO: trace down the bad peer and block it
debug!("Invalid piece downloaded!");
self.picker.invalidate_piece(piece);
if !self.stat.active() {
self.request_all();
}
}
}
disk::Response::ValidationUpdate { percent, .. } => {
Expand All @@ -713,14 +720,14 @@ impl<T: cio::CIO> Torrent<T> {
// part of an invalid file(none of the disk locations
// refer to files which aren't being downloaded(pri. 1)
invalid.retain(|i| {
Info::piece_disk_locs(&self.info, *i).all(|loc| self.priorities[loc.file] != 0)
Info::piece_disk_locs(&self.info, *i).any(|loc| self.priorities[loc.file] != 0)
});
if invalid.is_empty() {
info!("Torrent succesfully downloaded!");
debug!("Torrent succesfully validated!");
if !self.complete() {
for i in 0..self.pieces.len() {
let complete = Info::piece_disk_locs(&self.info, i as u32)
.all(|loc| self.priorities[loc.file] != 0);
.any(|loc| self.priorities[loc.file] != 0);
if complete {
self.pieces.set_bit(i);
}
Expand All @@ -730,15 +737,15 @@ impl<T: cio::CIO> Torrent<T> {
} else {
// If this is an initialization hash, start the torrent
// immediatly.
if !self.complete() {
debug!("initial validation complete, starting torrent");
if self.pieces().iter().count() == 0 {
debug!("validation complete, starting torrent");
// If there was some partial completion,
// set the pieces appropriately, then reset the
// picker to use the new bitfield
if invalid.len() != self.pieces.len() as usize {
for i in 0..self.pieces.len() {
let complete = Info::piece_disk_locs(&self.info, i as u32)
.all(|loc| self.priorities[loc.file] != 0);
.any(|loc| self.priorities[loc.file] != 0);
if complete {
self.pieces.set_bit(i);
}
Expand All @@ -747,18 +754,16 @@ impl<T: cio::CIO> Torrent<T> {
for piece in invalid {
self.pieces.unset_bit(u64::from(piece));
}
let mut rpc_updates = vec![];
self.cio.msg_rpc(rpc::CtlMessage::Update(rpc_updates));
let seq = self.picker.is_sequential();
self.change_picker(seq);
}
self.announce_start();
} else {
let mut rpc_updates = vec![];
for piece in invalid {
self.picker.invalidate_piece(piece);
self.pieces.unset_bit(u64::from(piece));
}
self.files.rebuild(&self.info, &self.pieces);
self.cio.msg_rpc(rpc::CtlMessage::Update(rpc_updates));
self.request_all();
}
self.status.state = StatusState::Incomplete;
Expand All @@ -772,6 +777,10 @@ impl<T: cio::CIO> Torrent<T> {
error!("Disk error: {:?}", err);
self.status.error = Some(format!("{}", err));
self.announce_status();
for piece in self.validating.drain() {
self.picker.invalidate_piece(piece);
self.pieces.unset_bit(piece as u64);
}
}
disk::Response::FreeSpace(_) => unreachable!(),
}
Expand All @@ -795,14 +804,8 @@ impl<T: cio::CIO> Torrent<T> {
self.status.state = StatusState::Complete;
let seq = self.picker.is_sequential();
self.change_picker(seq);
self.set_finished();
self.serialize();
if CONFIG.disk.validate {
debug!("Beginning validation");
self.validate();
} else {
debug!("Torrent complete");
self.set_finished();
}
}
} else if self.status.state == StatusState::Complete {
self.status.state = StatusState::Incomplete;
Expand All @@ -813,8 +816,7 @@ impl<T: cio::CIO> Torrent<T> {
}
/// Signal that we've downloaded and verified the torrent
fn set_finished(&mut self) {
// It's ok to say we've completed even if we haven't downloaded everything since
// the `left` field should indicate how much there still is to download.
info!("Torrent {} completed!", self.rpc_id());
if let Some(req) = tracker::Request::completed(self) {
self.cio.msg_trk(req);
}
Expand Down Expand Up @@ -943,7 +945,7 @@ impl<T: cio::CIO> Torrent<T> {
length,
} => {
// Ignore a piece we already have, this could happen from endgame
if self.pieces.has_bit(u64::from(index)) {
if self.pieces.has_bit(u64::from(index)) || self.validating.contains(&index) {
return Ok(());
}

Expand Down Expand Up @@ -978,28 +980,13 @@ impl<T: cio::CIO> Torrent<T> {
self.stat.add_dl(u64::from(length));

if piece_done {
self.pieces.set_bit(u64::from(index));
// Begin validation, and save state if the torrent is done
self.check_complete();
// Do on the fly validation of the piece
if !self.complete() {
if !self.leechers.is_empty() {
self.cio.msg_disk(disk::Request::validate_piece(
self.id,
self.info.clone(),
self.path.clone(),
index,
));
}
}

// Mark uninteresting peers
for peer in self.peers.values_mut() {
if !self.pieces.usable(peer.pieces()) {
peer.uninterested();
}
}
self.files.update(&self.info, index);
self.cio.msg_disk(disk::Request::validate_piece(
self.id,
self.info.clone(),
self.path.clone(),
index,
));
self.validating.insert(index);
}

// If there are any peers we've asked duplicate pieces for,
Expand All @@ -1025,8 +1012,10 @@ impl<T: cio::CIO> Torrent<T> {
begin,
length,
} => {
if !self.status.stopped() && !self.status.leeching() {
// TODO get this from some sort of allocator.
if !self.pieces.has_bit(index as u64) {
return Err(());
}
if !self.status.stopped() {
if length != self.info.block_len(index, begin) {
return Err(());
} else {
Expand Down Expand Up @@ -1378,7 +1367,6 @@ impl<T: cio::CIO> Torrent<T> {
}

fn set_priority(&mut self, priority: u8) {
// TODO: Implement priority somewhere(throttle or ctrl)
self.priority = priority;
let id = self.rpc_id();
self.cio.msg_rpc(rpc::CtlMessage::Update(vec![
Expand Down
13 changes: 1 addition & 12 deletions src/torrent/peer/mod.rs
Expand Up @@ -235,11 +235,7 @@ impl<T: cio::CIO> Peer<T> {
};
p.send_message(Message::handshake(&t.info));
if t.info.complete() {
if t.complete() {
p.send_message(Message::Bitfield(t.pieces.clone()));
} else {
p.send_message(Message::Bitfield(Bitfield::new(u64::from(t.info.pieces()))));
}
p.send_message(Message::Bitfield(t.pieces.clone()));
}
p.send_rpc_info();
Ok(p)
Expand Down Expand Up @@ -447,13 +443,6 @@ impl<T: cio::CIO> Peer<T> {
}
}

pub fn uninterested(&mut self) {
if self.local_status.interested {
self.local_status.interested = false;
self.send_message(Message::Uninterested);
}
}

pub fn send_message(&mut self, msg: Message) {
match msg {
Message::SharedPiece { length, .. } | Message::Piece { length, .. } => {
Expand Down

0 comments on commit f92d9f3

Please sign in to comment.