diff --git a/crates/bit_rev/src/protocol_udp.rs b/crates/bit_rev/src/protocol_udp.rs index 8b13789..8f65721 100644 --- a/crates/bit_rev/src/protocol_udp.rs +++ b/crates/bit_rev/src/protocol_udp.rs @@ -1 +1,256 @@ +use anyhow::{anyhow, Result}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use rand::Rng; +use std::io::{Cursor, Read, Write}; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::time::Duration; +use tokio::net::UdpSocket; +use tokio::time::{timeout, Instant}; +use tracing::{debug, info}; +use crate::file::TorrentMeta; + +const PROTOCOL_ID: u64 = 0x41727101980; +const ACTION_CONNECT: u32 = 0; +const ACTION_ANNOUNCE: u32 = 1; +const ACTION_SCRAPE: u32 = 2; +const ACTION_ERROR: u32 = 3; + +#[derive(Debug, Clone)] +pub struct UdpTracker { + pub url: String, + pub connection_id: Option, + pub last_connect: Option, +} + +#[derive(Debug, Clone)] +pub struct UdpPeer { + pub ip: [u8; 4], + pub port: u16, +} + +#[derive(Debug, Clone)] +pub struct UdpAnnounceResponse { + pub action: u32, + pub transaction_id: u32, + pub interval: u32, + pub leechers: u32, + pub seeders: u32, + pub peers: Vec, +} + +impl UdpTracker { + pub fn new(url: String) -> Self { + Self { + url, + connection_id: None, + last_connect: None, + } + } + + pub async fn announce( + &mut self, + torrent_meta: &TorrentMeta, + peer_id: &[u8; 20], + port: u16, + uploaded: u64, + downloaded: u64, + left: u64, + event: u32, + ) -> Result { + // Check if we need to connect/reconnect + if self.connection_id.is_none() + || self.last_connect.map_or(true, |t| t.elapsed() > Duration::from_secs(60)) + { + self.connect().await?; + } + + let connection_id = self.connection_id.ok_or_else(|| anyhow!("No connection ID"))?; + + let socket = UdpSocket::bind("0.0.0.0:0").await?; + let addr = self.parse_udp_url()?; + // info!("Using UDP tracker at {}", addr); + + let transaction_id: u32 = rand::thread_rng().gen(); + + // Build announce request + let mut request = Vec::new(); + request.write_u64::(connection_id)?; + request.write_u32::(ACTION_ANNOUNCE)?; + request.write_u32::(transaction_id)?; + request.write_all(&torrent_meta.info_hash)?; + request.write_all(peer_id)?; + request.write_u64::(downloaded)?; + request.write_u64::(left)?; + request.write_u64::(uploaded)?; + request.write_u32::(event)?; // 0: none, 1: completed, 2: started, 3: stopped + request.write_u32::(0)?; // IP address (0 = default) + request.write_u32::(rand::thread_rng().gen())?; // key + request.write_i32::(-1)?; // num_want (-1 = default) + request.write_u16::(port)?; + + debug!("Sending UDP announce request to {}", addr); + socket.send_to(&request, addr).await?; + + // Receive response with timeout + let mut buf = [0u8; 1024]; + let (len, _) = timeout(Duration::from_secs(15), socket.recv_from(&mut buf)).await??; + + self.parse_announce_response(&buf[..len], transaction_id) + } + + async fn connect(&mut self) -> Result<()> { + let socket = UdpSocket::bind("0.0.0.0:0").await?; + let addr = self.parse_udp_url()?; + + let transaction_id: u32 = rand::thread_rng().gen(); + + // Build connect request + let mut request = Vec::new(); + request.write_u64::(PROTOCOL_ID)?; + request.write_u32::(ACTION_CONNECT)?; + request.write_u32::(transaction_id)?; + + debug!("Sending UDP connect request to {}", addr); + socket.send_to(&request, addr).await?; + + // Receive response with timeout + let mut buf = [0u8; 16]; + let (len, _) = timeout(Duration::from_secs(15), socket.recv_from(&mut buf)).await??; + + if len < 16 { + return Err(anyhow!("Connect response too short: {} bytes", len)); + } + + let mut cursor = Cursor::new(&buf[..len]); + let action = cursor.read_u32::()?; + let response_transaction_id = cursor.read_u32::()?; + + if action == ACTION_ERROR { + let error_msg = String::from_utf8_lossy(&buf[8..len]); + return Err(anyhow!("Tracker error: {}", error_msg)); + } + + if action != ACTION_CONNECT { + return Err(anyhow!("Invalid action in connect response: {}", action)); + } + + if response_transaction_id != transaction_id { + return Err(anyhow!("Transaction ID mismatch in connect response")); + } + + self.connection_id = Some(cursor.read_u64::()?); + self.last_connect = Some(Instant::now()); + + debug!("UDP tracker connected with connection_id: {:?}", self.connection_id); + Ok(()) + } + + fn parse_udp_url(&self) -> Result { + if !self.url.starts_with("udp://") { + return Err(anyhow!("Invalid UDP tracker URL: {}", self.url)); + } + + let url_without_scheme = &self.url[6..]; // Remove "udp://" + + // Split at the first '/' to separate hostname:port from path + let host_port = url_without_scheme.split('/').next() + .ok_or_else(|| anyhow!("Invalid UDP tracker URL format: {}", self.url))?; + + let addr = host_port.to_socket_addrs()?.next() + .ok_or_else(|| anyhow!("Could not resolve UDP tracker address: {}", host_port))?; + + Ok(addr) + } + + fn parse_announce_response(&self, data: &[u8], expected_transaction_id: u32) -> Result { + if data.len() < 20 { + return Err(anyhow!("Announce response too short: {} bytes", data.len())); + } + + let mut cursor = Cursor::new(data); + let action = cursor.read_u32::()?; + let transaction_id = cursor.read_u32::()?; + + if action == ACTION_ERROR { + let error_msg = String::from_utf8_lossy(&data[8..]); + return Err(anyhow!("Tracker error: {}", error_msg)); + } + + if action != ACTION_ANNOUNCE { + return Err(anyhow!("Invalid action in announce response: {}", action)); + } + + if transaction_id != expected_transaction_id { + return Err(anyhow!("Transaction ID mismatch in announce response")); + } + + let interval = cursor.read_u32::()?; + let leechers = cursor.read_u32::()?; + let seeders = cursor.read_u32::()?; + + let mut peers = Vec::new(); + let remaining_bytes = data.len() - 20; + let peer_count = remaining_bytes / 6; // Each peer is 6 bytes (4 IP + 2 port) + + for _ in 0..peer_count { + let mut ip = [0u8; 4]; + cursor.read_exact(&mut ip)?; + let port = cursor.read_u16::()?; + + peers.push(UdpPeer { ip, port }); + } + + debug!("UDP announce response: {} seeders, {} leechers, {} peers", seeders, leechers, peers.len()); + + Ok(UdpAnnounceResponse { + action, + transaction_id, + interval, + leechers, + seeders, + peers, + }) + } +} + +impl UdpPeer { + pub fn to_socket_addr(&self) -> SocketAddr { + SocketAddr::from((self.ip, self.port)) + } +} + +pub async fn request_udp_peers( + tracker_url: &str, + torrent_meta: &TorrentMeta, + peer_id: &[u8; 20], + port: u16, +) -> Result { + let mut tracker = UdpTracker::new(tracker_url.to_string()); + + let uploaded = 0; + let downloaded = 0; + let left = torrent_meta.torrent_file.info.length.unwrap_or(0) as u64; + let event = 2; // started event + + tracker.announce(torrent_meta, peer_id, port, uploaded, downloaded, left, event).await +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_udp_url() { + let tracker = UdpTracker::new("udp://tracker.example.com:8080/announce".to_string()); + let result = tracker.parse_udp_url(); + assert!(result.is_ok()); + } + + #[test] + fn test_invalid_udp_url() { + let tracker = UdpTracker::new("http://tracker.example.com:8080/announce".to_string()); + let result = tracker.parse_udp_url(); + assert!(result.is_err()); + } +} \ No newline at end of file diff --git a/crates/bit_rev/src/tracker_peers.rs b/crates/bit_rev/src/tracker_peers.rs index 83d64f5..2ef17e0 100644 --- a/crates/bit_rev/src/tracker_peers.rs +++ b/crates/bit_rev/src/tracker_peers.rs @@ -1,7 +1,7 @@ use serde_bencode::de; use std::sync::{atomic::AtomicBool, Arc, Mutex}; use tokio::{select, sync::Semaphore}; -use tracing::debug; +use tracing::{debug, error}; use crate::{ file::{self, TorrentMeta}, @@ -10,6 +10,7 @@ use crate::{ FullPiece, PeerConnection, PeerHandler, PieceWorkState, TorrentDownloadedState, }, peer_state::PeerStates, + protocol_udp::request_udp_peers, session::PieceWork, }; @@ -46,12 +47,19 @@ impl TrackerPeers { let info_hash = self.torrent_meta.info_hash; let peer_id = self.peer_id; - let tcp_trackers = all_trackers(&self.torrent_meta.clone()) - .into_iter() - .filter(|t| !t.starts_with("udp://")); - - //TODO: support udp trackers - let tcp_trackers = tcp_trackers.clone(); + let all_tracker_urls = all_trackers(&self.torrent_meta.clone()); + let tcp_trackers: Vec = all_tracker_urls + .iter() + .filter(|t| !t.starts_with("udp://")) + .cloned() + .collect(); + let udp_trackers: Vec = all_tracker_urls + .iter() + .filter(|t| t.starts_with("udp://")) + .cloned() + .collect(); + + debug!("Connecting to trackers: TCP: {:?}, UDP: {:?}", tcp_trackers, udp_trackers); let torrent_meta = self.torrent_meta.clone(); let peer_states = self.peer_states.clone(); let piece_tx = self.piece_tx.clone(); @@ -70,89 +78,76 @@ impl TrackerPeers { }); tokio::spawn(async move { loop { + // Handle TCP trackers for tracker in tcp_trackers.clone() { let torrent_meta = torrent_meta.clone(); let peer_states = peer_states.clone(); let piece_tx = piece_tx.clone(); let have_broadcast = have_broadcast.clone(); let torrent_downloaded_state = torrent_downloaded_state.clone(); - //let pieces_of_work = pieces_of_work.clone(); tokio::spawn(async move { let url = file::build_tracker_url(&torrent_meta, &peer_id, 6881, &tracker); - let request_peers_res = request_peers(&url).await.unwrap(); - let new_peers = request_peers_res.clone().get_peers().unwrap(); - let peer_states = peer_states.clone(); - - for peer in new_peers { - let peer_states = peer_states.clone(); - //let pieces_of_work = pieces_of_work.clone(); - - if peer_states.clone().states.contains_key(&peer) { - continue; + match request_peers(&url).await { + Ok(request_peers_res) => { + match request_peers_res.clone().get_peers() { + Ok(new_peers) => { + process_peers( + new_peers, + info_hash, + peer_id, + peer_states.clone(), + piece_tx.clone(), + have_broadcast.clone(), + torrent_downloaded_state.clone(), + ).await; + + //sleep interval + tokio::time::sleep(std::time::Duration::from_millis( + request_peers_res.interval, + )).await; + } + Err(e) => debug!("Failed to parse peers from TCP tracker {}: {}", tracker, e), + } } + Err(e) => debug!("Failed to request peers from TCP tracker {}: {}", tracker, e), + } + }); + } - //let peers = peers.clone(); - let piece_tx = piece_tx.clone(); - let have_broadcast = have_broadcast.clone(); - let torrent_downloaded_state = torrent_downloaded_state.clone(); - - tokio::spawn(async move { - let unchoke_notify = tokio::sync::Notify::new(); - let (peer_writer_tx, peer_writer_rx) = flume::unbounded(); - - let peer_handler = Arc::new(PeerHandler::new( - peer, - unchoke_notify, - piece_tx.clone(), - peer_writer_tx.clone(), - peer_states.clone(), - //pieces_of_work.clone(), - torrent_downloaded_state.clone(), - )); - - let peer_connection = PeerConnection::new( - peer, + // Handle UDP trackers + for tracker in udp_trackers.clone() { + let torrent_meta = torrent_meta.clone(); + let peer_states = peer_states.clone(); + let piece_tx = piece_tx.clone(); + let have_broadcast = have_broadcast.clone(); + let torrent_downloaded_state = torrent_downloaded_state.clone(); + tokio::spawn(async move { + match request_udp_peers(&tracker, &torrent_meta, &peer_id, 6881).await { + Ok(udp_response) => { + debug!("Received UDP response from tracker {}: {:?}", tracker, udp_response); + let new_peers: Vec<_> = udp_response.peers + .into_iter() + .map(|p| p.to_socket_addr()) + .collect(); + + process_peers( + new_peers, info_hash, peer_id, - peer_handler.clone(), - ); - - let task_peer_chunk_req_fut = - peer_handler.task_peer_chunk_requester(); - let connect_peer_fut = peer_connection.manage_peer_incoming( - peer_writer_rx, - have_broadcast.subscribe(), - ); - - let req = select! { - r = connect_peer_fut => { - debug!("connect_peer_fut: {:#?}", r); - r - } - r = task_peer_chunk_req_fut => { - debug!("task_peer_chunk_req_fut: {:#?}", r); - r - } - }; - - match req { - Ok(_) => { - // We disconnected the peer ourselves as we don't need it - peer_handler.on_peer_died(); - } - Err(e) => { - debug!("error managing peer: {:#}", e); - peer_handler.on_peer_died(); - } - } - }); + peer_states.clone(), + piece_tx.clone(), + have_broadcast.clone(), + torrent_downloaded_state.clone(), + ).await; + + //sleep interval + tokio::time::sleep(std::time::Duration::from_secs( + udp_response.interval as u64, + )).await; + } + Err(e) => error!("Failed to request peers from UDP tracker {}: {}", tracker, e), } - //sleep interval - tokio::time::sleep(std::time::Duration::from_millis( - request_peers_res.interval, - )) - .await }); } @@ -162,6 +157,76 @@ impl TrackerPeers { } } +async fn process_peers( + new_peers: Vec, + info_hash: [u8; 20], + peer_id: [u8; 20], + peer_states: Arc, + piece_tx: flume::Sender, + have_broadcast: Arc>, + torrent_downloaded_state: Arc, +) { + for peer in new_peers { + if peer_states.clone().states.contains_key(&peer) { + continue; + } + + let piece_tx = piece_tx.clone(); + let have_broadcast = have_broadcast.clone(); + let torrent_downloaded_state = torrent_downloaded_state.clone(); + let peer_states = peer_states.clone(); + + tokio::spawn(async move { + let unchoke_notify = tokio::sync::Notify::new(); + let (peer_writer_tx, peer_writer_rx) = flume::unbounded(); + + let peer_handler = Arc::new(PeerHandler::new( + peer, + unchoke_notify, + piece_tx.clone(), + peer_writer_tx.clone(), + peer_states.clone(), + torrent_downloaded_state.clone(), + )); + + let peer_connection = PeerConnection::new( + peer, + info_hash, + peer_id, + peer_handler.clone(), + ); + + let task_peer_chunk_req_fut = peer_handler.task_peer_chunk_requester(); + let connect_peer_fut = peer_connection.manage_peer_incoming( + peer_writer_rx, + have_broadcast.subscribe(), + ); + + let req = select! { + r = connect_peer_fut => { + debug!("connect_peer_fut: {:#?}", r); + r + } + r = task_peer_chunk_req_fut => { + debug!("task_peer_chunk_req_fut: {:#?}", r); + r + } + }; + + match req { + Ok(_) => { + // We disconnected the peer ourselves as we don't need it + peer_handler.on_peer_died(); + } + Err(e) => { + debug!("error managing peer: {:#}", e); + peer_handler.on_peer_died(); + } + } + }); + } +} + fn all_trackers(torrent_meta: &TorrentMeta) -> Vec { match ( &torrent_meta.torrent_file.announce,