Skip to content

Commit

Permalink
Implement HonestRevealModule; Update Modules To Return Errors When Ad…
Browse files Browse the repository at this point in the history
…ding A Torrent That Was Already Added Or Removing One That Was Never Added
  • Loading branch information
GGist committed Nov 17, 2017
1 parent ce819f6 commit 7a3a3c3
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 29 deletions.
3 changes: 2 additions & 1 deletion bip_select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ license = "MIT/Apache-2.0"
bip_handshake = "0.7"
bip_peer = "0.5"
bip_metainfo = "0.12"
bip_utracker = "0.3"
bip_utracker = "0.4"
bit-set = "0.4"
bytes = "0.4"
error-chain = "0.11"
futures = "0.1"
Expand Down
15 changes: 15 additions & 0 deletions bip_select/src/discovery/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! Module for discovery error types.

use bip_handshake::InfoHash;
use bip_peer::PeerInfo;

error_chain! {
Expand All @@ -13,5 +16,17 @@ error_chain! {
description("Peer Sent An Invalid Message")
display("Peer {:?} Sent An Invalid Message: {:?}", info, message)
}
InvalidMetainfoExists {
hash: InfoHash
} {
description("Metainfo Has Already Been Added")
display("Metainfo With Hash {:?} Has Already Been Added", hash)
}
InvalidMetainfoNotExists {
hash: InfoHash
} {
description("Metainfo Was Not Already Added")
display("Metainfo With Hash {:?} Was Not Already Added", hash)
}
}
}
5 changes: 4 additions & 1 deletion bip_select/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use bip_utracker::announce::ClientState;
use std::net::SocketAddr;

pub mod error;
pub mod ut_metadata;

mod ut_metadata;

pub use self::ut_metadata::UtMetadataModule;

/// Enumeration of discovery messages that can be sent to a discovery module.
#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
26 changes: 18 additions & 8 deletions bip_select/src/discovery/ut_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bip_peer::messages::builders::ExtendedMessageBuilder;
use bytes::BytesMut;
use discovery::IDiscoveryMessage;
use discovery::ODiscoveryMessage;
use discovery::error::DiscoveryError;
use discovery::error::{DiscoveryError, DiscoveryErrorKind};
use extended::ExtendedListener;
use extended::ExtendedPeerInfo;
use futures::Async;
Expand All @@ -29,6 +29,7 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::io::Write;
use std::time::Duration;
use std::collections::hash_map::Entry;

const REQUEST_TIMEOUT_MILLIS: u64 = 2000;
const MAX_REQUEST_SIZE: usize = 16 * 1024;
Expand Down Expand Up @@ -92,18 +93,27 @@ impl UtMetadataModule {
}

fn add_torrent(&mut self, metainfo: Metainfo) -> StartSend<IDiscoveryMessage, DiscoveryError> {
let info_bytes = metainfo.info().to_bytes();
let info_hash = metainfo.info().info_hash();

self.completed_map
.insert(metainfo.info().info_hash(), info_bytes);
match self.completed_map.entry(info_hash) {
Entry::Occupied(_) => {
Err(DiscoveryError::from_kind(DiscoveryErrorKind::InvalidMetainfoExists{ hash: info_hash }))
},
Entry::Vacant(vac) => {
let info_bytes = metainfo.info().to_bytes();
vac.insert(info_bytes);

Ok(AsyncSink::Ready)
Ok(AsyncSink::Ready)
}
}
}

fn remove_torrent(&mut self, metainfo: Metainfo) -> StartSend<IDiscoveryMessage, DiscoveryError> {
self.completed_map.remove(&metainfo.info().info_hash());

Ok(AsyncSink::Ready)
if self.completed_map.remove(&metainfo.info().info_hash()).is_none() {
Err(DiscoveryError::from_kind(DiscoveryErrorKind::InvalidMetainfoNotExists{ hash: metainfo.info().info_hash() }))
} else {
Ok(AsyncSink::Ready)
}
}

fn add_peer(&mut self, info: PeerInfo, ext_info: &ExtendedPeerInfo) -> StartSend<IDiscoveryMessage, DiscoveryError> {
Expand Down
2 changes: 2 additions & 0 deletions bip_select/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Module for uber error types.

use discovery::error::{DiscoveryError, DiscoveryErrorKind};

error_chain! {
Expand Down
15 changes: 5 additions & 10 deletions bip_select/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate bip_handshake;
extern crate bip_metainfo;
extern crate bip_peer;
extern crate bip_utracker;
extern crate bit_set;
extern crate bytes;
#[macro_use]
extern crate error_chain;
Expand All @@ -14,19 +15,13 @@ use bip_metainfo::Metainfo;
use bip_peer::PeerInfo;
use std::time::Duration;

mod discovery;
pub mod discovery;
pub mod error;
pub mod revelation;

mod extended;
mod error;
mod uber;

/// Error types for all modules.
pub mod errors {
pub use discovery::error::{DiscoveryError, DiscoveryErrorKind, DiscoveryResultExt};
pub use error::{UberError, UberErrorKind, UberResultExt};
}

pub use discovery::{IDiscoveryMessage, ODiscoveryMessage};
pub use discovery::ut_metadata::UtMetadataModule;
pub use extended::{ExtendedListener, ExtendedPeerInfo, IExtendedMessage, OExtendedMessage};
pub use uber::{IUberMessage, OUberMessage, UberModule, UberModuleBuilder};

Expand Down
33 changes: 33 additions & 0 deletions bip_select/src/revelation/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//! Module for revelation error types.

use bip_handshake::InfoHash;
use bip_peer::PeerInfo;

error_chain! {
types {
RevealError, RevealErrorKind, RevealResultExt;
}

errors {
InvalidMessage {
info: PeerInfo,
message: String
} {
description("Peer Sent An Invalid Message")
display("Peer {:?} Sent An Invalid Message: {:?}", info, message)
}
InvalidMetainfoExists {
hash: InfoHash
} {
description("Metainfo Has Already Been Added")
display("Metainfo With Hash {:?} Has Already Been Added", hash)
}
InvalidMetainfoNotExists {
hash: InfoHash
} {
description("Metainfo Was Not Already Added")
display("Metainfo With Hash {:?} Was Not Already Added", hash)
}
}
}

173 changes: 173 additions & 0 deletions bip_select/src/revelation/honest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@


use bytes::BytesMut;
use bip_metainfo::Metainfo;
use futures::task::Task;
use bip_handshake::InfoHash;
use std::collections::HashMap;
use futures::{Sink, AsyncSink, Async};
use futures::Stream;
use futures::Poll;
use futures::StartSend;
use revelation::error::{RevealError, RevealErrorKind};
use revelation::IRevealMessage;
use bip_peer::PeerInfo;
use std::collections::HashSet;
use bit_set::BitSet;
use revelation::ORevealMessage;
use std::collections::VecDeque;
use ControlMessage;
use std::collections::hash_map::Entry;
use bip_peer::messages::{BitFieldMessage, HaveMessage};
use futures::task;

/// Revelation module that will honestly report any pieces we have to peers.
pub struct HonestRevealModule {
torrents: HashMap<InfoHash, PeersInfo>,
out_queue: VecDeque<ORevealMessage>,
// Shared bytes container to write bitfield messages to
out_bytes: BytesMut,
opt_stream: Option<Task>
}

struct PeersInfo {
status: BitSet<u8>,
peers: HashSet<PeerInfo>
}

impl HonestRevealModule {
/// Create a new `HonestRevelationModule`.
pub fn new() -> HonestRevealModule {
HonestRevealModule{ torrents: HashMap::new(), out_queue: VecDeque::new(),
out_bytes: BytesMut::new(), opt_stream: None }
}

fn add_torrent(&mut self, metainfo: &Metainfo) -> StartSend<IRevealMessage, RevealError> {
let info_hash = metainfo.info().info_hash();

match self.torrents.entry(info_hash) {
Entry::Occupied(_) => {
Err(RevealError::from_kind(RevealErrorKind::InvalidMetainfoExists{ hash: info_hash }))
},
Entry::Vacant(vac) => {
let mut piece_set = BitSet::default();
piece_set.reserve_len_exact(metainfo.info().pieces().count());

let peers_info = PeersInfo{ status: piece_set, peers: HashSet::new() };
vac.insert(peers_info);

Ok(AsyncSink::Ready)
}
}
}

fn remove_torrent(&mut self, metainfo: &Metainfo) -> StartSend<IRevealMessage, RevealError> {
let info_hash = metainfo.info().info_hash();

if self.torrents.remove(&info_hash).is_none() {
Err(RevealError::from_kind(RevealErrorKind::InvalidMetainfoNotExists{ hash: info_hash }))
} else {
Ok(AsyncSink::Ready)
}
}

fn add_peer(&mut self, peer: PeerInfo) -> StartSend<IRevealMessage, RevealError> {
let info_hash = *peer.hash();

let out_bytes = &mut self.out_bytes;
let out_queue = &mut self.out_queue;
self.torrents.get_mut(&info_hash)
.map(|peers_info| {
// Add the peer to our list, so we send have messages to them
peers_info.peers.insert(peer);

// Get our current bitfield, write it to our shared bytes
let bitfield_slice = peers_info.status.get_ref().storage();
out_bytes.extend_from_slice(bitfield_slice);
// Split off what we wrote, send this in the message, will be re-used on drop
let bitfield_bytes = out_bytes.split_off(0).freeze();
let bitfield = BitFieldMessage::new(bitfield_bytes);

// Enqueue the bitfield message so that we send it to the peer
out_queue.push_back(ORevealMessage::SendBitField(peer, bitfield));

Ok(AsyncSink::Ready)
}).unwrap_or_else(|| Err(RevealError::from_kind(RevealErrorKind::InvalidMetainfoNotExists{ hash: info_hash })))
}

fn remove_peer(&mut self, peer: PeerInfo) -> StartSend<IRevealMessage, RevealError> {
let info_hash = *peer.hash();

self.torrents.get_mut(&info_hash)
.map(|peers_info| {
peers_info.peers.remove(&peer);

Ok(AsyncSink::Ready)
}).unwrap_or_else(|| Err(RevealError::from_kind(RevealErrorKind::InvalidMetainfoNotExists{ hash: info_hash })))
}

fn insert_piece(&mut self, hash: InfoHash, index: u64) -> StartSend<IRevealMessage, RevealError> {
let out_queue = &mut self.out_queue;
self.torrents.get_mut(&hash)
.map(|peers_info| {
// Queue up all have messages
for peer in peers_info.peers.iter() {
out_queue.push_back(ORevealMessage::SendHave(*peer, HaveMessage::new(index as u32)));
}

// Insert into bitfield
peers_info.status.insert(index as usize);

Ok(AsyncSink::Ready)
}).unwrap_or_else(|| Err(RevealError::from_kind(RevealErrorKind::InvalidMetainfoNotExists{ hash: hash })))
}

//------------------------------------------------------//

fn check_stream_unblock(&mut self) {
if !self.out_queue.is_empty() {
self.opt_stream.take().as_ref().map(Task::notify);
}
}
}

impl Sink for HonestRevealModule {
type SinkItem = IRevealMessage;
type SinkError = RevealError;

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let result = match item {
IRevealMessage::Control(ControlMessage::AddTorrent(metainfo)) => self.add_torrent(&metainfo),
IRevealMessage::Control(ControlMessage::RemoveTorrent(metainfo)) => self.remove_torrent(&metainfo),
IRevealMessage::Control(ControlMessage::PeerConnected(info)) => self.add_peer(info),
IRevealMessage::Control(ControlMessage::PeerDisconnected(info)) => self.remove_peer(info),
IRevealMessage::FoundGoodPiece(hash, index) => self.insert_piece(hash, index),
IRevealMessage::Control(ControlMessage::Tick(_)) |
IRevealMessage::ReceivedBitField(_, _) |
IRevealMessage::ReceivedHave(_, _) => Ok(AsyncSink::Ready)
};

self.check_stream_unblock();

result
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(Async::Ready(()))
}
}

impl Stream for HonestRevealModule {
type Item = ORevealMessage;
type Error = RevealError;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let next_item = self.out_queue.pop_front().map(|item| Ok(Async::Ready(Some(item))));

next_item.unwrap_or_else(|| {
self.opt_stream = Some(task::current());

Ok(Async::NotReady)
})
}
}
36 changes: 27 additions & 9 deletions bip_select/src/revelation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
enum IRevealMessage {
PeerControl(PeerControlMessage),
//! Module for piece revelation.

use bip_peer::messages::HaveMessage;
use bip_peer::messages::BitFieldMessage;
use bip_peer::PeerInfo;
use bip_handshake::InfoHash;
use ControlMessage;

pub mod error;

mod honest;

pub use self::honest::HonestRevealModule;

/// Enumeration of revelation messages that can be sent to a revelation module.
pub enum IRevealMessage {
/// Control message.
Control(ControlMessage),
/// Good piece for the given `InfoHash` was found.
FoundGoodPiece(InfoHash, u64),
/// Received a `BitFieldMessage`.
ReceivedBitField(PeerInfo, BitFieldMessage),
ReceivedHave(PeerInfo, HaveMessage),
UpdateHave(InfoHash, HaveMessage)
/// Received a `HaveMessage`.
ReceivedHave(PeerInfo, HaveMessage)
}

enum ORevealMessage {
/// Enumeration of revelation messages that can be received from a revelation module.
pub enum ORevealMessage {
/// Send a `BitFieldMessage`.
SendBitField(PeerInfo, BitFieldMessage),
/// Send a `HaveMessage`.
SendHave(PeerInfo, HaveMessage)
}

trait PieceRevelation {

}

0 comments on commit 7a3a3c3

Please sign in to comment.