diff --git a/src/frontend/mod.rs b/src/frontend/mod.rs index 2b2f02d..d95dc7b 100644 --- a/src/frontend/mod.rs +++ b/src/frontend/mod.rs @@ -79,10 +79,10 @@ pub struct TorrentInfo { pub struct Frontend<'a> { pub style: AppStyle, pub ctx: Arc, + pub torrent_list: TorrentList<'a>, torrent_txs: HashMap<[u8; 20], mpsc::Sender>, disk_tx: mpsc::Sender, terminal: Terminal>, - torrent_list: TorrentList<'a>, config: Config, } @@ -204,6 +204,10 @@ impl<'a> Frontend<'a> { .torrent_infos .insert(info_hash, torrent_info_l); + if self.torrent_list.state.selected() == None { + self.torrent_list.state.select(Some(0)); + } + let args = Args::parse(); let mut listen = self.config.listen; diff --git a/src/peer/mod.rs b/src/peer/mod.rs index 94cbe5d..37a07f9 100644 --- a/src/peer/mod.rs +++ b/src/peer/mod.rs @@ -55,7 +55,8 @@ pub enum PeerMsg { /// Tell this peer that we are not interested, /// update the local state and send a message to the peer NotInterested, - Cancel(BlockInfo), + CancelBlock(BlockInfo), + CancelMetadata(u32), /// Sometimes a peer either takes too long to answer, /// or simply does not answer at all. In both cases /// we need to request the block again. @@ -233,11 +234,7 @@ impl Peer { socket.send(Message::Bitfield(bitfield.clone())).await?; } - // todo: implement choke & interested algorithms - // send Interested - self.session.state.am_interested = true; - socket.send(Message::Interested).await?; - + // todo: implement choke algorithm // send Unchoke self.session.state.am_choking = false; socket.send(Message::Unchoke).await?; @@ -259,8 +256,6 @@ impl Peer { ); let (mut sink, mut stream) = socket.split(); - let torrent_tx = self.torrent_ctx.tx.clone(); - self.session.state.connection = ConnectionState::Connected; loop { @@ -290,11 +285,21 @@ impl Peer { *b = bitfield.clone(); drop(b); - // update the bitfield of the `Torrent` - // will create a new, empty bitfield, with - // the same len - let _ = torrent_tx.send(TorrentMsg::UpdateBitfield(bitfield.len_bytes())) - .await; + for x in bitfield.into_iter() { + if x.bit == 0 { + info!("{:?} we are interested due to Bitfield", self.addr); + + self.session.state.am_interested = true; + sink.send(Message::Interested).await?; + + if self.can_request() { + self.request_block_infos(&mut sink).await?; + } + + break; + } + } + info!("------------------------------\n"); } Message::Unchoke => { @@ -342,28 +347,30 @@ impl Peer { pieces.set(piece); drop(pieces); - // maybe become interested in peer let torrent_ctx = self.torrent_ctx.clone(); let torrent_p = torrent_ctx.pieces.read().await; - let has_piece = torrent_p.get(piece); - - match has_piece { - Some(_) => { - info!("Already have this piece, not sending interested"); - } - None => { - info!("We do not have this piece, sending interested"); - // maybe request the incoming piece - if self.can_request() { - let bit_item = torrent_p.get(piece); - - if let Some(a) = bit_item { - if a.bit == 0 { - info!("requesting piece {piece}"); - self.request_block_infos(&mut sink).await?; - } + let bit_item = torrent_p.get(piece); + + // maybe become interested in peer and request blocks + if !self.session.state.am_interested { + match bit_item { + Some(a) => { + if a.bit == 1 { + info!("already have this piece, ignoring"); + } else { + info!("We do not have this piece, sending interested"); + info!("{:?} we are interested due to Have", self.addr); + + self.session.state.am_interested = true; + sink.send(Message::Interested).await?; } } + None => { + info!("We do not have `info` downloaded yet, sending interested"); + info!("{:?} we are interested due to Have", self.addr); + self.session.state.am_interested = true; + sink.send(Message::Interested).await?; + } } } @@ -378,7 +385,6 @@ impl Peer { info!("--"); self.handle_piece_msg(block).await?; - if self.can_request() { self.request_block_infos(&mut sink).await?; } @@ -514,6 +520,11 @@ impl Peer { info!("{metadata:?}"); self.torrent_ctx.tx.send(TorrentMsg::DownloadedInfoPiece(t, metadata.piece, info)).await?; + self.torrent_ctx.tx.send(TorrentMsg::SendCancelMetadata{ + from: self.ctx.id.read().await.unwrap(), + index: metadata.piece + }) + .await?; } _ => {} } @@ -527,19 +538,18 @@ impl Peer { match msg { PeerMsg::ReRequest(block_info) => { if self.outgoing_requests.get(&block_info).is_some() && self.can_request() { - if let Some(max) = self.session.target_request_queue_len { - if self.outgoing_requests.len() + 1 < max as usize { - info!("rerequesting block_info due to timeout {block_info:?}"); - sink.send(Message::Request(block_info.clone())).await?; - - let timeout = self.session.request_timeout(); - let tx = self.ctx.tx.clone(); - spawn(async move { - tokio::time::sleep(timeout).await; - let _ = tx.send(PeerMsg::ReRequest(block_info)).await; - }); - } - } + info!("{} rerequesting block_info due to timeout {block_info:?}", self.addr); + + sink.send(Message::Request(block_info.clone())).await?; + + let timeout = self.session.request_timeout(); + let tx = self.ctx.tx.clone(); + + spawn(async move { + info!("rerequest timeout is {timeout:?}"); + tokio::time::sleep(timeout).await; + let _ = tx.send(PeerMsg::ReRequest(block_info)).await; + }); } } PeerMsg::HavePiece(piece) => { @@ -561,15 +571,14 @@ impl Peer { } PeerMsg::RequestBlockInfo(block_info) => { info!("{:?} RequestBlockInfo {block_info:#?}", self.addr); - if let Some(max) = self.session.target_request_queue_len { - if self.outgoing_requests.len() + 1 < max as usize && self.can_request() { - sink.send(Message::Request(block_info)).await?; - } + if self.outgoing_requests.len() + 1 <= self.session.target_request_queue_len as usize && self.can_request() { + self.outgoing_requests.insert(block_info); + self.request_block_infos(&mut sink).await?; } } PeerMsg::RequestBlockInfos(block_infos) => { info!("{:?} RequestBlockInfos len {}", self.addr, block_infos.len()); - let max = self.session.target_request_queue_len.unwrap_or(0) as usize - self.outgoing_requests.len(); + let max = self.session.target_request_queue_len as usize - self.outgoing_requests.len(); for info in block_infos.into_iter().take(max) { sink.send(Message::Request(info)).await?; } @@ -578,10 +587,17 @@ impl Peer { self.session.state.am_interested = false; sink.send(Message::NotInterested).await?; } - PeerMsg::Cancel(block_info) => { + PeerMsg::CancelBlock(block_info) => { info!("{:?} sending Cancel", self.addr); sink.send(Message::Cancel(block_info)).await?; } + PeerMsg::CancelMetadata(index) => { + info!("{:?} sending CancelMetadata", self.addr); + let metadata_reject = Metadata::reject(index); + let metadata_reject = metadata_reject.to_bencode().unwrap(); + + sink.send(Message::Extended((3, metadata_reject))).await?; + } PeerMsg::Quit => { info!("{:?} quitting", self.addr); self.session.state.connection = ConnectionState::Quitting; @@ -621,7 +637,7 @@ impl Peer { let _ = self .torrent_ctx .tx - .send(TorrentMsg::SendCancel { + .send(TorrentMsg::SendCancelBlock { from, block_info: block_info.clone(), }) @@ -649,6 +665,10 @@ impl Peer { where T: SinkExt + Sized + std::marker::Unpin, { + // request blocks if we can + if self.can_request() { + self.request_block_infos(sink).await?; + } // resent requests if we have pending requests and more time has elapsed // since the last request than the current timeout value if !self.outgoing_requests.is_empty() { @@ -687,25 +707,24 @@ impl Peer { if elapsed_since_last_request > request_timeout { warn!( - "{:?} timeout after {} ms, cancelling {} request(s) (timeouts: {})", + "{:?} timeout after {} ms, freeing {} request(s) (timeouts: {})", self.addr, elapsed_since_last_request.as_millis(), self.outgoing_requests.len(), self.session.timed_out_request_count + 1 ); - // If in slow_start, - // cancel all requests and re-issue a single one (since we can - // only request a single block now). Start by freeing up the - // blocks in their piece download. - // Note that we're not telling the peer that we timed out the - // request so that if it arrives some time later and is not - // requested by another peer, we can still collect it. - if self.session.in_slow_start { - self.free_pending_blocks().await; - self.session.register_request_timeout(); - self.request_block_infos(sink).await?; - } + warn!("inflight blocks {}", self.outgoing_requests.len()); + warn!("can request {}", self.session.target_request_queue_len); + + // note: im trying to spawn threads and rerequest individual block_infos, + // but, if this approach doesnt work, I should move the free_pending_blocks + // outside of this if. + // self.free_pending_blocks().await; + // self.session.register_request_timeout(); + // if self.can_request() { + // self.request_block_infos(sink).await?; + // } } } @@ -743,9 +762,13 @@ impl Peer { where T: SinkExt + Sized + std::marker::Unpin, { - // the max blocks pending allowed for this peer - let target_request_queue_len = - self.session.target_request_queue_len.unwrap_or_default() as usize; + // on the first request, we only ask for 1 block, + // the first peer to answer is probably the fastest peer. + // when we receive the block, we request the max amount. + // this is useful in the scenario of downloading a small torrent, + // the first peer can easily request all blocks of the torrent. + // if the peer is a slow one, we wasted a lot of time. + let target_request_queue_len = self.session.target_request_queue_len as usize; // the number of blocks we can request right now let request_len = if self.outgoing_requests.len() >= target_request_queue_len { @@ -755,6 +778,10 @@ impl Peer { }; if request_len > 0 { + info!("inflight: {}", self.outgoing_requests.len()); + info!("max to request: {}", target_request_queue_len); + info!("requesting len: {request_len}"); + // get a list of unique block_infos from the Disk, // those are already marked as requested on Torrent let (otx, orx) = oneshot::channel(); @@ -771,7 +798,8 @@ impl Peer { let r = orx.await?; if r.is_empty() && !self.session.in_endgame && self.outgoing_requests.len() <= 20 { - self.start_endgame().await; + // endgame is probably slowing down the download_rate for some reason? + // self.start_endgame().await; } self.session.last_outgoing_request_time = Some(std::time::Instant::now()); @@ -810,7 +838,7 @@ impl Peer { debug_assert!(id.is_some()); if let Some(id) = *id { - let outgoing: Vec = self.outgoing_requests.clone().into_iter().collect(); + let outgoing: Vec = self.outgoing_requests.drain().into_iter().collect(); let _ = self .torrent_ctx .tx diff --git a/src/peer/session.rs b/src/peer/session.rs index 65dd1c6..57b84a5 100644 --- a/src/peer/session.rs +++ b/src/peer/session.rs @@ -91,10 +91,7 @@ pub struct Session { /// ``` /// /// Only set once we start downloading. - // TODO: consider changing this to just usize starting at 0 and reset to - // 0 once download finishes so that it's easier to deal with it (not having - // to match on it all the time) - pub target_request_queue_len: Option, + pub target_request_queue_len: u16, /// The last time some requests were sent to the peer. pub last_outgoing_request_time: Option, @@ -147,8 +144,8 @@ impl Session { pub fn register_request_timeout(&mut self) { // peer has timed out, only allow a single outstanding request // from now until peer hasn't timed out - self.target_request_queue_len = Some(1); - self.timed_out_request_count += 1; + // self.target_request_queue_len -= 1; + // self.timed_out_request_count -= 1; self.request_timed_out = true; self.in_slow_start = false; } @@ -164,7 +161,7 @@ impl Session { self.in_slow_start = reqq.is_none(); // reset the target request queue size, which will be adjusted as the // download progresses - self.target_request_queue_len = Some(reqq.unwrap_or(Self::START_REQUEST_QUEUE_LEN)); + self.target_request_queue_len = reqq.unwrap_or(Self::START_REQUEST_QUEUE_LEN); } /// Updates various statistics around a block download. @@ -200,9 +197,7 @@ impl Session { // if we're in slow-start mode, we need to increase the target queue // size every time a block is received if self.in_slow_start { - if let Some(target_request_queue_len) = &mut self.target_request_queue_len { - *target_request_queue_len += 1; - } + self.target_request_queue_len += 1; } } @@ -244,7 +239,7 @@ impl Session { // this only makes sense if we're not choked if !self.state.am_choking && self.in_slow_start - && self.target_request_queue_len.is_some() + && self.target_request_queue_len > 0 && self.counters.payload.down.round() > 0 && self.counters.payload.down.round() + Self::SLOW_START_ERROR_MARGIN < self.counters.payload.down.avg() @@ -256,33 +251,31 @@ impl Session { /// Adjusts the target request queue size based on the current download /// statistics. fn update_target_request_queue_len(&mut self) { - if let Some(target_request_queue_len) = &mut self.target_request_queue_len { - let prev_queue_len = *target_request_queue_len; - - // this is only applicable if we're not in slow start, as in slow - // start mode the request queue is increased with each incoming - // block - if !self.in_slow_start { - let download_rate = self.counters.payload.down.avg(); - // guard against integer truncation and round up as - // overestimating the link capacity is cheaper than - // underestimating it - *target_request_queue_len = - ((download_rate + (BLOCK_LEN - 1) as u64) / BLOCK_LEN as u64) as u16; - } + let prev_queue_len = self.target_request_queue_len; + + // this is only applicable if we're not in slow start, as in slow + // start mode the request queue is increased with each incoming + // block + if !self.in_slow_start { + let download_rate = self.counters.payload.down.avg(); + // guard against integer truncation and round up as + // overestimating the link capacity is cheaper than + // underestimating it + self.target_request_queue_len = + ((download_rate + (BLOCK_LEN - 1) as u64) / BLOCK_LEN as u64) as u16; + } - // make sure the target doesn't go below 1 - // TODO: make this configurable and also enforce an upper bound - if *target_request_queue_len < 1 { - *target_request_queue_len = 1; - } + // make sure the target doesn't go below 1 + // TODO: make this configurable and also enforce an upper bound + if self.target_request_queue_len < 1 { + self.target_request_queue_len = 1; + } - if prev_queue_len != *target_request_queue_len { - info!( - "Request queue changed from {} to {}", - prev_queue_len, *target_request_queue_len - ); - } + if prev_queue_len != self.target_request_queue_len { + info!( + "Request queue changed from {} to {}", + prev_queue_len, self.target_request_queue_len + ); } } } @@ -300,7 +293,7 @@ mod tests { s.prepare_for_download(None); - assert!(s.target_request_queue_len > Some(0)); + assert!(s.target_request_queue_len > 0); assert!(s.in_slow_start); } @@ -311,7 +304,7 @@ mod tests { s.state.am_interested = true; s.state.am_choking = false; s.in_slow_start = true; - s.target_request_queue_len = Some(1); + s.target_request_queue_len = 1; // rate increasing s.counters.payload.down += 10 * BLOCK_LEN as u64; @@ -350,7 +343,7 @@ mod tests { s.state.am_interested = true; s.state.am_choking = false; s.in_slow_start = true; - s.target_request_queue_len = Some(1); + s.target_request_queue_len = 1; // rate increasing s.counters.payload.down += 2 * BLOCK_LEN as u64; @@ -360,7 +353,7 @@ mod tests { // this should be a noop s.update_target_request_queue_len(); - assert_eq!(s.target_request_queue_len, Some(1)); + assert_eq!(s.target_request_queue_len, 1); } #[test] @@ -370,7 +363,7 @@ mod tests { s.state.am_interested = true; s.state.am_choking = false; s.in_slow_start = false; - s.target_request_queue_len = Some(1); + s.target_request_queue_len = 1; // rate increasing (make it more than a multiple of the block // length to be able to test against integer truncation) @@ -384,7 +377,7 @@ mod tests { // queue size based on bandwidth-delay product: // (33768 + (16384 - 1)) / 16384 = 3.06 ~ 3 s.update_target_request_queue_len(); - assert_eq!(s.target_request_queue_len, Some(3)); + assert_eq!(s.target_request_queue_len, 3); } #[test] @@ -394,12 +387,12 @@ mod tests { s.state.am_interested = true; s.state.am_choking = false; s.in_slow_start = true; - s.target_request_queue_len = Some(1); + s.target_request_queue_len = 1; s.update_download_stats(BLOCK_LEN); // request queue length should be increased by one in slow start - assert_eq!(s.target_request_queue_len, Some(2)); + assert_eq!(s.target_request_queue_len, 2); // incoming request time should be set assert!(s.last_incoming_block_time.is_some()); // download stat should be increased diff --git a/src/torrent.rs b/src/torrent.rs index eaa1033..257617b 100644 --- a/src/torrent.rs +++ b/src/torrent.rs @@ -35,12 +35,6 @@ use tracing::{info, warn}; #[derive(Debug)] pub enum TorrentMsg { - /// Message to update the torrent's Bitfield, - /// Torrent will start with a blank bitfield - /// because it cannot know it from a magnet link - /// once a peer send the first bitfield message, - /// we will update it. - UpdateBitfield(usize), /// Message when one of the peers have downloaded /// an entire piece. We send Have messages to peers /// that don't have it and update the UI with stats. @@ -49,10 +43,17 @@ pub enum TorrentMsg { DownloadComplete, /// When in endgame mode, the first peer that receives this info, /// sends this message to send Cancel's to all other peers. - SendCancel { + SendCancelBlock { from: [u8; 20], block_info: BlockInfo, }, + /// When a peer downloads a piece of a metadata, + /// send cancels to all other peers so that we dont receive + /// pieces that we already have + SendCancelMetadata { + from: [u8; 20], + index: u32, + }, StartEndgame([u8; 20], Vec), /// When a peer downloads an info piece, /// we need to mutate `info_dict` and maybe @@ -123,6 +124,7 @@ pub struct Stats { } impl Torrent { + #[tracing::instrument(skip(disk_tx, fr_tx), name = "torrent::new")] pub fn new(disk_tx: mpsc::Sender, fr_tx: mpsc::Sender, magnet: &str) -> Self { let magnet = get_magnet(magnet).unwrap_or_else(|_| { eprintln!("The magnet link is invalid, try another one"); @@ -349,19 +351,6 @@ impl Torrent { select! { Some(msg) = self.rx.recv() => { match msg { - TorrentMsg::UpdateBitfield(len) => { - // create an empty bitfield with the same - // len as the bitfield from the peer - let ctx = Arc::clone(&self.ctx); - let mut pieces = ctx.pieces.write().await; - - // only create the bitfield if we don't have one - // pieces.len() will start at 0 - if pieces.len() < len { - let inner = vec![0_u8; len]; - *pieces = Bitfield::from(inner); - } - } TorrentMsg::DownloadedPiece(piece) => { // send Have messages to peers that dont have our pieces for peer in self.peer_ctxs.values() { @@ -391,6 +380,7 @@ impl Torrent { if let Ok(Ok(r)) = orx.await { info!("announced completion with success {r:#?}"); + self.stats = r.into(); } // tell all peers that we are not interested, @@ -406,10 +396,16 @@ impl Torrent { } // The peer "from" was the first one to receive the "info". // Send Cancel messages to everyone else. - TorrentMsg::SendCancel { from, block_info } => { + TorrentMsg::SendCancelBlock { from, block_info } => { + for (k, peer) in self.peer_ctxs.iter() { + if *k == from { continue }; + let _ = peer.tx.send(PeerMsg::CancelBlock(block_info.clone())).await; + } + } + TorrentMsg::SendCancelMetadata { from, index } => { for (k, peer) in self.peer_ctxs.iter() { if *k == from { continue }; - let _ = peer.tx.send(PeerMsg::Cancel(block_info.clone())).await; + let _ = peer.tx.send(PeerMsg::CancelMetadata(index)).await; } } TorrentMsg::StartEndgame(_peer_id, block_infos) => { @@ -450,9 +446,13 @@ impl Torrent { let hash = hex::encode(hash); if hash.to_uppercase() == m_info.to_uppercase() { - self.status = TorrentStatus::Downloading; info!("the hash of the downloaded info matches the hash of the magnet link"); + // with the info fully downloaded, we now know the pieces len, + // this will update the bitfield of the torrent + let mut pieces = self.ctx.pieces.write().await; + *pieces = Bitfield::from(vec![0_u8; info.pieces() as usize * 8]); + self.size = info.get_size(); self.have_info = true; @@ -461,6 +461,8 @@ impl Torrent { *info_l = info; drop(info_l); + self.status = TorrentStatus::Downloading; + self.disk_tx.send(DiskMsg::NewTorrent(self.ctx.clone())).await?; } else { warn!("a peer sent a valid Info, but the hash does not match the hash of the provided magnet link, panicking");