From 3ce7cd972a9e170a135f9231064db282831240c3 Mon Sep 17 00:00:00 2001 From: Emil Sayahi <97276123+emmyoh@users.noreply.github.com> Date: Tue, 7 May 2024 12:47:56 -0400 Subject: [PATCH] feat: Begin implementing multi-threading Multi-threading has not been tested yet. --- src/discovery.rs | 8 ++- src/fs.rs | 175 +++++++++++++++++++++++++++-------------------- src/main.rs | 2 +- 3 files changed, 106 insertions(+), 79 deletions(-) diff --git a/src/discovery.rs b/src/discovery.rs index ff09439..01b0191 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -27,7 +27,9 @@ pub const ANNOUNCE_PARALLELISM: usize = 10; /// # Arguments /// /// * `namespace_id` - The ID of the replica to announce. -pub async fn announce_replica(namespace_id: NamespaceId) -> Result<(), Box> { +pub async fn announce_replica( + namespace_id: NamespaceId, +) -> Result<(), Box> { let mut content = BTreeSet::new(); content.insert(HashAndFormat::raw(Hash::new(namespace_id))); let dht = mainline::Dht::default(); @@ -82,8 +84,8 @@ impl ContentRequest { } impl FromStr for ContentRequest { - type Err = Box; - fn from_str(s: &str) -> Result> { + type Err = Box; + fn from_str(s: &str) -> Result> { if let Ok(hash) = Hash::from_str(s) { Ok(hash.into()) } else if let Ok(haf) = HashAndFormat::from_str(s) { diff --git a/src/fs.rs b/src/fs.rs index 8e66500..616a910 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -20,10 +20,11 @@ use iroh_mainline_content_discovery::to_infohash; use iroh_pkarr_node_discovery::PkarrNodeDiscovery; use path_clean::PathClean; use rand_core::OsRng; -use std::io::{BufRead, BufReader, Read, Write}; -use std::net::TcpListener; -use std::net::{Ipv4Addr, SocketAddrV4, TcpStream}; +use std::net::{Ipv4Addr, SocketAddrV4}; use std::{error::Error, path::PathBuf}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpListener; +use tokio::net::TcpStream; /// The path on disk where the file system is stored. pub const FS_PATH: &str = ".oku"; @@ -70,7 +71,7 @@ impl OkuFs { /// # Returns /// /// A running instance of an Oku file system. - pub async fn start() -> Result> { + pub async fn start() -> Result> { let node_path = PathBuf::from(FS_PATH).join("node"); let node = FsNode::persistent(node_path).await?.spawn().await?; let authors = node.authors.list().await?; @@ -122,7 +123,9 @@ impl OkuFs { /// # Returns /// /// A discovery service for finding other node's addresses given their IDs. - pub async fn create_discovery_service(&self) -> Result> { + pub async fn create_discovery_service( + &self, + ) -> Result> { let node_addr = self.node.my_addr().await?; let addr_info = node_addr.info; let magic_endpoint = self.node.magic_endpoint(); @@ -144,7 +147,7 @@ impl OkuFs { /// # Returns /// /// The ID of the new replica, being its public key. - pub async fn create_replica(&self) -> Result> { + pub async fn create_replica(&self) -> Result> { let docs_client = &self.node.docs; let new_document = docs_client.create().await?; let document_id = new_document.id(); @@ -157,7 +160,10 @@ impl OkuFs { /// # Arguments /// /// * `namespace_id` - The ID of the replica to delete. - pub async fn delete_replica(&self, namespace_id: NamespaceId) -> Result<(), Box> { + pub async fn delete_replica( + &self, + namespace_id: NamespaceId, + ) -> Result<(), Box> { let docs_client = &self.node.docs; Ok(docs_client.drop_doc(namespace_id).await?) } @@ -167,7 +173,7 @@ impl OkuFs { /// # Returns /// /// A list of all replicas in the file system. - pub async fn list_replicas(&self) -> Result, Box> { + pub async fn list_replicas(&self) -> Result, Box> { let docs_client = &self.node.docs; let replicas = docs_client.list().await?; pin_mut!(replicas); @@ -188,7 +194,7 @@ impl OkuFs { pub async fn list_files( &self, namespace_id: NamespaceId, - ) -> Result, Box> { + ) -> Result, Box> { let docs_client = &self.node.docs; let document = docs_client .open(namespace_id) @@ -219,7 +225,7 @@ impl OkuFs { namespace_id: NamespaceId, path: PathBuf, data: impl Into, - ) -> Result> { + ) -> Result> { let file_key = path_to_entry_key(path); let data_bytes = data.into(); let docs_client = &self.node.docs; @@ -249,7 +255,7 @@ impl OkuFs { &self, namespace_id: NamespaceId, path: PathBuf, - ) -> Result> { + ) -> Result> { let file_key = path_to_entry_key(path); let docs_client = &self.node.docs; let document = docs_client @@ -275,7 +281,7 @@ impl OkuFs { &self, namespace_id: NamespaceId, path: PathBuf, - ) -> Result> { + ) -> Result> { let file_key = path_to_entry_key(path); let docs_client = &self.node.docs; let document = docs_client @@ -307,7 +313,7 @@ impl OkuFs { namespace_id: NamespaceId, from: PathBuf, to: PathBuf, - ) -> Result<(Hash, usize), Box> { + ) -> Result<(Hash, usize), Box> { let data = self.read_file(namespace_id, from.clone()).await?; let hash = self .create_or_modify_file(namespace_id, to.clone(), data) @@ -331,7 +337,7 @@ impl OkuFs { &self, namespace_id: NamespaceId, path: PathBuf, - ) -> Result> { + ) -> Result> { let path = normalise_path(path).join(""); // Ensure path ends with a slash let docs_client = &self.node.docs; let document = docs_client @@ -356,7 +362,7 @@ impl OkuFs { pub async fn respond_to_content_request( &self, request: PeerContentRequest, - ) -> Result> { + ) -> Result> { let docs_client = &self.node.docs; let document = docs_client .open(request.namespace_id) @@ -420,35 +426,41 @@ impl OkuFs { /// Handles incoming requests for document tickets. /// This function listens for incoming connections from peers and responds to requests for document tickets. - pub async fn listen_for_document_ticket_fetch_requests(&self) -> Result<(), Box> { + pub async fn listen_for_document_ticket_fetch_requests( + &self, + ) -> Result<(), Box> { let socket = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DISCOVERY_PORT); - let listener = TcpListener::bind(socket)?; - for stream in listener.incoming() { - let mut stream = stream?; - let mut buf_reader = BufReader::new(&mut stream); - let received: Vec = buf_reader.fill_buf()?.to_vec(); - buf_reader.consume(received.len()); - let mut incoming_lines = received.split(|x| *x == 10); - if let Some(first_line) = incoming_lines.next() { - if first_line == ALPN_DOCUMENT_TICKET_FETCH { - let remaining_lines: Vec> = - incoming_lines.map(|x| x.to_owned()).collect(); - let peer_content_request_bytes = remaining_lines.concat(); - let peer_content_request_str = - String::from_utf8_lossy(&peer_content_request_bytes).to_string(); - let peer_content_request = serde_json::from_str(&peer_content_request_str)?; - let peer_content_response = self - .respond_to_content_request(peer_content_request) - .await?; - let peer_content_response_string = - serde_json::to_string(&peer_content_response)?; - stream.write_all(peer_content_response_string.as_bytes())?; - stream.flush()?; + let listener = TcpListener::bind(socket).await?; + loop { + let (mut stream, _) = listener.accept().await?; + let self_clone = self.clone(); + tokio::spawn(async move { + let mut buf_reader = BufReader::new(&mut stream); + let received: Vec = buf_reader.fill_buf().await?.to_vec(); + buf_reader.consume(received.len()); + let mut incoming_lines = received.split(|x| *x == 10); + if let Some(first_line) = incoming_lines.next() { + if first_line == ALPN_DOCUMENT_TICKET_FETCH { + let remaining_lines: Vec> = + incoming_lines.map(|x| x.to_owned()).collect(); + let peer_content_request_bytes = remaining_lines.concat(); + let peer_content_request_str = + String::from_utf8_lossy(&peer_content_request_bytes).to_string(); + let peer_content_request = serde_json::from_str(&peer_content_request_str)?; + let peer_content_response = self_clone + .respond_to_content_request(peer_content_request) + .await?; + let peer_content_response_string = + serde_json::to_string(&peer_content_response)?; + stream + .write_all(peer_content_response_string.as_bytes()) + .await?; + stream.flush().await?; + } } - } + Ok::<(), Box>(()) + }); } - - Ok(()) } /// Joins a swarm to fetch the latest version of a replica and save it to the local machine. @@ -468,7 +480,7 @@ impl OkuFs { path: Option, partial: bool, verified: bool, - ) -> Result<(), Box> { + ) -> Result<(), Box> { let content = ContentRequest::Hash(Hash::new(namespace_id)); let dht = mainline::Dht::default(); let q = Query { @@ -481,44 +493,57 @@ impl OkuFs { let info_hash = to_infohash(q.content); let peer_content_request = PeerContentRequest { namespace_id, path }; let peer_content_request_string = serde_json::to_string(&peer_content_request)?; + let docs_client = &self.node.docs; let mut addrs = dht.get_peers(info_hash); for peer_response in &mut addrs { - let mut stream = TcpStream::connect(peer_response.peer)?; - let mut request = Vec::new(); - request.write_all(ALPN_DOCUMENT_TICKET_FETCH)?; - request.write_all(b"\n")?; - request.write_all(peer_content_request_string.as_bytes())?; - request.flush()?; - stream.write_all(&request)?; - stream.flush()?; - let mut response_bytes = Vec::new(); - stream.read_to_end(&mut response_bytes)?; - let response: PeerContentResponse = - serde_json::from_str(String::from_utf8_lossy(&response_bytes).as_ref())?; - match response.ticket_response { - PeerTicketResponse::Document(document_ticket) => { - if document_ticket.capability.id() != namespace_id { - continue; + if docs_client.open(namespace_id).await.is_ok() { + break; + } + let peer_content_request_string = peer_content_request_string.clone(); + let docs_client = docs_client.clone(); + let self_clone = self.clone(); + tokio::spawn(async move { + let mut stream = TcpStream::connect(peer_response.peer).await?; + let mut request = Vec::new(); + request.write_all(ALPN_DOCUMENT_TICKET_FETCH).await?; + request.write_all(b"\n").await?; + request + .write_all(peer_content_request_string.as_bytes()) + .await?; + request.flush().await?; + stream.write_all(&request).await?; + stream.flush().await?; + let mut response_bytes = Vec::new(); + stream.read_to_end(&mut response_bytes).await?; + let response: PeerContentResponse = + serde_json::from_str(String::from_utf8_lossy(&response_bytes).as_ref())?; + match response.ticket_response { + PeerTicketResponse::Document(document_ticket) => { + if document_ticket.capability.id() != namespace_id { + return Ok::<(), Box>(()); + } + // let docs_client = &self.node.docs; + docs_client.import(document_ticket).await?; + Ok::<(), Box>(()) } - let docs_client = &self.node.docs; - docs_client.import(document_ticket).await?; - } - PeerTicketResponse::Entries(entry_tickets) => { - let blobs_client = &self.node.blobs; - for blob_ticket in entry_tickets { - let ticket_parts = blob_ticket.into_parts(); - let blob_download_request = BlobDownloadRequest { - hash: ticket_parts.1, - format: ticket_parts.2, - peer: ticket_parts.0, - tag: iroh::rpc_protocol::SetTagOption::Auto, - }; - blobs_client.download(blob_download_request).await?; - break; + PeerTicketResponse::Entries(entry_tickets) => { + let blobs_client = &self_clone.node.blobs; + for blob_ticket in entry_tickets { + let ticket_parts = blob_ticket.into_parts(); + let blob_download_request = BlobDownloadRequest { + hash: ticket_parts.1, + format: ticket_parts.2, + peer: ticket_parts.0, + tag: iroh::rpc_protocol::SetTagOption::Auto, + }; + blobs_client.download(blob_download_request).await?; + break; + } + Ok::<(), Box>(()) } } - } + }); } Ok(()) @@ -534,7 +559,7 @@ impl OkuFs { /// # Returns /// /// The author credentials. -pub fn load_or_create_author() -> Result> { +pub fn load_or_create_author() -> Result> { let path = PathBuf::from(FS_PATH).join("author"); let author_file = std::fs::read(path.clone()); match author_file { diff --git a/src/main.rs b/src/main.rs index 80374d1..54c7cb2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,7 +65,7 @@ enum Commands { } #[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { let cli = Cli::parse(); let node = OkuFs::start().await?; match cli.command {