Skip to content

Commit

Permalink
interested algorithm, getting bitfield from info instead of another peer
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrieldemian committed Aug 22, 2023
1 parent 0f13b03 commit 8311e59
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 138 deletions.
6 changes: 5 additions & 1 deletion src/frontend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ pub struct TorrentInfo {
pub struct Frontend<'a> {
pub style: AppStyle,
pub ctx: Arc<FrontendCtx>,
pub torrent_list: TorrentList<'a>,
torrent_txs: HashMap<[u8; 20], mpsc::Sender<TorrentMsg>>,
disk_tx: mpsc::Sender<DiskMsg>,
terminal: Terminal<CrosstermBackend<Stdout>>,
torrent_list: TorrentList<'a>,
config: Config,
}

Expand Down Expand Up @@ -204,6 +204,10 @@ impl<'a> Frontend<'a> {
.torrent_infos
.insert(info_hash, torrent_info_l);

if self.torrent_list.state.selected() == None {

Check failure on line 207 in src/frontend/mod.rs

View workflow job for this annotation

GitHub Actions / Lints

binary comparison to literal `Option::None`
self.torrent_list.state.select(Some(0));
}

let args = Args::parse();
let mut listen = self.config.listen;

Expand Down
168 changes: 98 additions & 70 deletions src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ pub enum PeerMsg {
/// Tell this peer that we are not interested,
/// update the local state and send a message to the peer
NotInterested,
Cancel(BlockInfo),
CancelBlock(BlockInfo),
CancelMetadata(u32),
/// Sometimes a peer either takes too long to answer,
/// or simply does not answer at all. In both cases
/// we need to request the block again.
Expand Down Expand Up @@ -233,11 +234,7 @@ impl Peer {
socket.send(Message::Bitfield(bitfield.clone())).await?;
}

// todo: implement choke & interested algorithms
// send Interested
self.session.state.am_interested = true;
socket.send(Message::Interested).await?;

// todo: implement choke algorithm
// send Unchoke
self.session.state.am_choking = false;
socket.send(Message::Unchoke).await?;
Expand All @@ -259,8 +256,6 @@ impl Peer {
);
let (mut sink, mut stream) = socket.split();

let torrent_tx = self.torrent_ctx.tx.clone();

self.session.state.connection = ConnectionState::Connected;

loop {
Expand Down Expand Up @@ -290,11 +285,21 @@ impl Peer {
*b = bitfield.clone();
drop(b);

// update the bitfield of the `Torrent`
// will create a new, empty bitfield, with
// the same len
let _ = torrent_tx.send(TorrentMsg::UpdateBitfield(bitfield.len_bytes()))
.await;
for x in bitfield.into_iter() {
if x.bit == 0 {
info!("{:?} we are interested due to Bitfield", self.addr);

self.session.state.am_interested = true;
sink.send(Message::Interested).await?;

if self.can_request() {
self.request_block_infos(&mut sink).await?;
}

break;
}
}

info!("------------------------------\n");
}
Message::Unchoke => {
Expand Down Expand Up @@ -342,28 +347,30 @@ impl Peer {
pieces.set(piece);
drop(pieces);

// maybe become interested in peer
let torrent_ctx = self.torrent_ctx.clone();
let torrent_p = torrent_ctx.pieces.read().await;
let has_piece = torrent_p.get(piece);

match has_piece {
Some(_) => {
info!("Already have this piece, not sending interested");
}
None => {
info!("We do not have this piece, sending interested");
// maybe request the incoming piece
if self.can_request() {
let bit_item = torrent_p.get(piece);

if let Some(a) = bit_item {
if a.bit == 0 {
info!("requesting piece {piece}");
self.request_block_infos(&mut sink).await?;
}
let bit_item = torrent_p.get(piece);

// maybe become interested in peer and request blocks
if !self.session.state.am_interested {
match bit_item {
Some(a) => {
if a.bit == 1 {
info!("already have this piece, ignoring");
} else {
info!("We do not have this piece, sending interested");
info!("{:?} we are interested due to Have", self.addr);

self.session.state.am_interested = true;
sink.send(Message::Interested).await?;
}
}
None => {
info!("We do not have `info` downloaded yet, sending interested");
info!("{:?} we are interested due to Have", self.addr);
self.session.state.am_interested = true;
sink.send(Message::Interested).await?;
}
}
}

Expand All @@ -378,7 +385,6 @@ impl Peer {
info!("--");

self.handle_piece_msg(block).await?;

if self.can_request() {
self.request_block_infos(&mut sink).await?;
}
Expand Down Expand Up @@ -514,6 +520,11 @@ impl Peer {
info!("{metadata:?}");

self.torrent_ctx.tx.send(TorrentMsg::DownloadedInfoPiece(t, metadata.piece, info)).await?;
self.torrent_ctx.tx.send(TorrentMsg::SendCancelMetadata{
from: self.ctx.id.read().await.unwrap(),
index: metadata.piece
})
.await?;
}
_ => {}
}
Expand All @@ -527,19 +538,18 @@ impl Peer {
match msg {
PeerMsg::ReRequest(block_info) => {
if self.outgoing_requests.get(&block_info).is_some() && self.can_request() {
if let Some(max) = self.session.target_request_queue_len {
if self.outgoing_requests.len() + 1 < max as usize {
info!("rerequesting block_info due to timeout {block_info:?}");
sink.send(Message::Request(block_info.clone())).await?;

let timeout = self.session.request_timeout();
let tx = self.ctx.tx.clone();
spawn(async move {
tokio::time::sleep(timeout).await;
let _ = tx.send(PeerMsg::ReRequest(block_info)).await;
});
}
}
info!("{} rerequesting block_info due to timeout {block_info:?}", self.addr);

sink.send(Message::Request(block_info.clone())).await?;

let timeout = self.session.request_timeout();
let tx = self.ctx.tx.clone();

spawn(async move {
info!("rerequest timeout is {timeout:?}");
tokio::time::sleep(timeout).await;
let _ = tx.send(PeerMsg::ReRequest(block_info)).await;
});
}
}
PeerMsg::HavePiece(piece) => {
Expand All @@ -561,15 +571,14 @@ impl Peer {
}
PeerMsg::RequestBlockInfo(block_info) => {
info!("{:?} RequestBlockInfo {block_info:#?}", self.addr);
if let Some(max) = self.session.target_request_queue_len {
if self.outgoing_requests.len() + 1 < max as usize && self.can_request() {
sink.send(Message::Request(block_info)).await?;
}
if self.outgoing_requests.len() + 1 <= self.session.target_request_queue_len as usize && self.can_request() {

Check failure on line 574 in src/peer/mod.rs

View workflow job for this annotation

GitHub Actions / Lints

unnecessary `>= y + 1` or `x - 1 >=`
self.outgoing_requests.insert(block_info);
self.request_block_infos(&mut sink).await?;
}
}
PeerMsg::RequestBlockInfos(block_infos) => {
info!("{:?} RequestBlockInfos len {}", self.addr, block_infos.len());
let max = self.session.target_request_queue_len.unwrap_or(0) as usize - self.outgoing_requests.len();
let max = self.session.target_request_queue_len as usize - self.outgoing_requests.len();
for info in block_infos.into_iter().take(max) {
sink.send(Message::Request(info)).await?;
}
Expand All @@ -578,10 +587,17 @@ impl Peer {
self.session.state.am_interested = false;
sink.send(Message::NotInterested).await?;
}
PeerMsg::Cancel(block_info) => {
PeerMsg::CancelBlock(block_info) => {
info!("{:?} sending Cancel", self.addr);
sink.send(Message::Cancel(block_info)).await?;
}
PeerMsg::CancelMetadata(index) => {
info!("{:?} sending CancelMetadata", self.addr);
let metadata_reject = Metadata::reject(index);
let metadata_reject = metadata_reject.to_bencode().unwrap();

sink.send(Message::Extended((3, metadata_reject))).await?;
}
PeerMsg::Quit => {
info!("{:?} quitting", self.addr);
self.session.state.connection = ConnectionState::Quitting;
Expand Down Expand Up @@ -621,7 +637,7 @@ impl Peer {
let _ = self
.torrent_ctx
.tx
.send(TorrentMsg::SendCancel {
.send(TorrentMsg::SendCancelBlock {
from,
block_info: block_info.clone(),
})
Expand Down Expand Up @@ -649,6 +665,10 @@ impl Peer {
where
T: SinkExt<Message> + Sized + std::marker::Unpin,
{
// request blocks if we can
if self.can_request() {
self.request_block_infos(sink).await?;
}
// resent requests if we have pending requests and more time has elapsed
// since the last request than the current timeout value
if !self.outgoing_requests.is_empty() {
Expand Down Expand Up @@ -687,25 +707,24 @@ impl Peer {

if elapsed_since_last_request > request_timeout {
warn!(
"{:?} timeout after {} ms, cancelling {} request(s) (timeouts: {})",
"{:?} timeout after {} ms, freeing {} request(s) (timeouts: {})",
self.addr,
elapsed_since_last_request.as_millis(),
self.outgoing_requests.len(),
self.session.timed_out_request_count + 1
);

// If in slow_start,
// cancel all requests and re-issue a single one (since we can
// only request a single block now). Start by freeing up the
// blocks in their piece download.
// Note that we're not telling the peer that we timed out the
// request so that if it arrives some time later and is not
// requested by another peer, we can still collect it.
if self.session.in_slow_start {
self.free_pending_blocks().await;
self.session.register_request_timeout();
self.request_block_infos(sink).await?;
}
warn!("inflight blocks {}", self.outgoing_requests.len());
warn!("can request {}", self.session.target_request_queue_len);

// note: im trying to spawn threads and rerequest individual block_infos,
// but, if this approach doesnt work, I should move the free_pending_blocks
// outside of this if.
// self.free_pending_blocks().await;
// self.session.register_request_timeout();
// if self.can_request() {
// self.request_block_infos(sink).await?;
// }
}
}

Expand Down Expand Up @@ -743,9 +762,13 @@ impl Peer {
where
T: SinkExt<Message> + Sized + std::marker::Unpin,
{
// the max blocks pending allowed for this peer
let target_request_queue_len =
self.session.target_request_queue_len.unwrap_or_default() as usize;
// on the first request, we only ask for 1 block,
// the first peer to answer is probably the fastest peer.
// when we receive the block, we request the max amount.
// this is useful in the scenario of downloading a small torrent,
// the first peer can easily request all blocks of the torrent.
// if the peer is a slow one, we wasted a lot of time.
let target_request_queue_len = self.session.target_request_queue_len as usize;

// the number of blocks we can request right now
let request_len = if self.outgoing_requests.len() >= target_request_queue_len {
Expand All @@ -755,6 +778,10 @@ impl Peer {
};

if request_len > 0 {
info!("inflight: {}", self.outgoing_requests.len());
info!("max to request: {}", target_request_queue_len);
info!("requesting len: {request_len}");

// get a list of unique block_infos from the Disk,
// those are already marked as requested on Torrent
let (otx, orx) = oneshot::channel();
Expand All @@ -771,7 +798,8 @@ impl Peer {
let r = orx.await?;

if r.is_empty() && !self.session.in_endgame && self.outgoing_requests.len() <= 20 {
self.start_endgame().await;
// endgame is probably slowing down the download_rate for some reason?
// self.start_endgame().await;
}

self.session.last_outgoing_request_time = Some(std::time::Instant::now());
Expand Down Expand Up @@ -810,7 +838,7 @@ impl Peer {
debug_assert!(id.is_some());

if let Some(id) = *id {
let outgoing: Vec<BlockInfo> = self.outgoing_requests.clone().into_iter().collect();
let outgoing: Vec<BlockInfo> = self.outgoing_requests.drain().into_iter().collect();

Check failure on line 841 in src/peer/mod.rs

View workflow job for this annotation

GitHub Actions / Lints

useless conversion to the same type: `hashbrown::hash_set::Drain<'_, tcp_wire::lib::BlockInfo>`
let _ = self
.torrent_ctx
.tx
Expand Down
Loading

0 comments on commit 8311e59

Please sign in to comment.