Skip to content

Commit

Permalink
feat: Begin implementing multi-threading
Browse files Browse the repository at this point in the history
Multi-threading has not been tested yet.
  • Loading branch information
emmyoh committed May 7, 2024
1 parent 68dd8dc commit 3ce7cd9
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 79 deletions.
8 changes: 5 additions & 3 deletions src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ pub const ANNOUNCE_PARALLELISM: usize = 10;
/// # Arguments
///
/// * `namespace_id` - The ID of the replica to announce.
pub async fn announce_replica(namespace_id: NamespaceId) -> Result<(), Box<dyn Error>> {
pub async fn announce_replica(
namespace_id: NamespaceId,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut content = BTreeSet::new();
content.insert(HashAndFormat::raw(Hash::new(namespace_id)));
let dht = mainline::Dht::default();
Expand Down Expand Up @@ -82,8 +84,8 @@ impl ContentRequest {
}

impl FromStr for ContentRequest {
type Err = Box<dyn Error>;
fn from_str(s: &str) -> Result<Self, Box<dyn Error>> {
type Err = Box<dyn Error + Send + Sync>;
fn from_str(s: &str) -> Result<Self, Box<dyn Error + Send + Sync>> {
if let Ok(hash) = Hash::from_str(s) {
Ok(hash.into())
} else if let Ok(haf) = HashAndFormat::from_str(s) {
Expand Down
175 changes: 100 additions & 75 deletions src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ use iroh_mainline_content_discovery::to_infohash;
use iroh_pkarr_node_discovery::PkarrNodeDiscovery;
use path_clean::PathClean;
use rand_core::OsRng;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpListener;
use std::net::{Ipv4Addr, SocketAddrV4, TcpStream};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::{error::Error, path::PathBuf};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::net::TcpStream;

/// The path on disk where the file system is stored.
pub const FS_PATH: &str = ".oku";
Expand Down Expand Up @@ -70,7 +71,7 @@ impl OkuFs {
/// # Returns
///
/// A running instance of an Oku file system.
pub async fn start() -> Result<OkuFs, Box<dyn Error>> {
pub async fn start() -> Result<OkuFs, Box<dyn Error + Send + Sync>> {
let node_path = PathBuf::from(FS_PATH).join("node");
let node = FsNode::persistent(node_path).await?.spawn().await?;
let authors = node.authors.list().await?;
Expand Down Expand Up @@ -122,7 +123,9 @@ impl OkuFs {
/// # Returns
///
/// A discovery service for finding other node's addresses given their IDs.
pub async fn create_discovery_service(&self) -> Result<ConcurrentDiscovery, Box<dyn Error>> {
pub async fn create_discovery_service(
&self,
) -> Result<ConcurrentDiscovery, Box<dyn Error + Send + Sync>> {
let node_addr = self.node.my_addr().await?;
let addr_info = node_addr.info;
let magic_endpoint = self.node.magic_endpoint();
Expand All @@ -144,7 +147,7 @@ impl OkuFs {
/// # Returns
///
/// The ID of the new replica, being its public key.
pub async fn create_replica(&self) -> Result<NamespaceId, Box<dyn Error>> {
pub async fn create_replica(&self) -> Result<NamespaceId, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let new_document = docs_client.create().await?;
let document_id = new_document.id();
Expand All @@ -157,7 +160,10 @@ impl OkuFs {
/// # Arguments
///
/// * `namespace_id` - The ID of the replica to delete.
pub async fn delete_replica(&self, namespace_id: NamespaceId) -> Result<(), Box<dyn Error>> {
pub async fn delete_replica(
&self,
namespace_id: NamespaceId,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
Ok(docs_client.drop_doc(namespace_id).await?)
}
Expand All @@ -167,7 +173,7 @@ impl OkuFs {
/// # Returns
///
/// A list of all replicas in the file system.
pub async fn list_replicas(&self) -> Result<Vec<NamespaceId>, Box<dyn Error>> {
pub async fn list_replicas(&self) -> Result<Vec<NamespaceId>, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let replicas = docs_client.list().await?;
pin_mut!(replicas);
Expand All @@ -188,7 +194,7 @@ impl OkuFs {
pub async fn list_files(
&self,
namespace_id: NamespaceId,
) -> Result<Vec<Entry>, Box<dyn Error>> {
) -> Result<Vec<Entry>, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let document = docs_client
.open(namespace_id)
Expand Down Expand Up @@ -219,7 +225,7 @@ impl OkuFs {
namespace_id: NamespaceId,
path: PathBuf,
data: impl Into<Bytes>,
) -> Result<Hash, Box<dyn Error>> {
) -> Result<Hash, Box<dyn Error + Send + Sync>> {
let file_key = path_to_entry_key(path);
let data_bytes = data.into();
let docs_client = &self.node.docs;
Expand Down Expand Up @@ -249,7 +255,7 @@ impl OkuFs {
&self,
namespace_id: NamespaceId,
path: PathBuf,
) -> Result<usize, Box<dyn Error>> {
) -> Result<usize, Box<dyn Error + Send + Sync>> {
let file_key = path_to_entry_key(path);
let docs_client = &self.node.docs;
let document = docs_client
Expand All @@ -275,7 +281,7 @@ impl OkuFs {
&self,
namespace_id: NamespaceId,
path: PathBuf,
) -> Result<Bytes, Box<dyn Error>> {
) -> Result<Bytes, Box<dyn Error + Send + Sync>> {
let file_key = path_to_entry_key(path);
let docs_client = &self.node.docs;
let document = docs_client
Expand Down Expand Up @@ -307,7 +313,7 @@ impl OkuFs {
namespace_id: NamespaceId,
from: PathBuf,
to: PathBuf,
) -> Result<(Hash, usize), Box<dyn Error>> {
) -> Result<(Hash, usize), Box<dyn Error + Send + Sync>> {
let data = self.read_file(namespace_id, from.clone()).await?;
let hash = self
.create_or_modify_file(namespace_id, to.clone(), data)
Expand All @@ -331,7 +337,7 @@ impl OkuFs {
&self,
namespace_id: NamespaceId,
path: PathBuf,
) -> Result<usize, Box<dyn Error>> {
) -> Result<usize, Box<dyn Error + Send + Sync>> {
let path = normalise_path(path).join(""); // Ensure path ends with a slash
let docs_client = &self.node.docs;
let document = docs_client
Expand All @@ -356,7 +362,7 @@ impl OkuFs {
pub async fn respond_to_content_request(
&self,
request: PeerContentRequest,
) -> Result<PeerContentResponse, Box<dyn Error>> {
) -> Result<PeerContentResponse, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let document = docs_client
.open(request.namespace_id)
Expand Down Expand Up @@ -420,35 +426,41 @@ impl OkuFs {

/// Handles incoming requests for document tickets.
/// This function listens for incoming connections from peers and responds to requests for document tickets.
pub async fn listen_for_document_ticket_fetch_requests(&self) -> Result<(), Box<dyn Error>> {
pub async fn listen_for_document_ticket_fetch_requests(
&self,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let socket = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DISCOVERY_PORT);
let listener = TcpListener::bind(socket)?;
for stream in listener.incoming() {
let mut stream = stream?;
let mut buf_reader = BufReader::new(&mut stream);
let received: Vec<u8> = buf_reader.fill_buf()?.to_vec();
buf_reader.consume(received.len());
let mut incoming_lines = received.split(|x| *x == 10);
if let Some(first_line) = incoming_lines.next() {
if first_line == ALPN_DOCUMENT_TICKET_FETCH {
let remaining_lines: Vec<Vec<u8>> =
incoming_lines.map(|x| x.to_owned()).collect();
let peer_content_request_bytes = remaining_lines.concat();
let peer_content_request_str =
String::from_utf8_lossy(&peer_content_request_bytes).to_string();
let peer_content_request = serde_json::from_str(&peer_content_request_str)?;
let peer_content_response = self
.respond_to_content_request(peer_content_request)
.await?;
let peer_content_response_string =
serde_json::to_string(&peer_content_response)?;
stream.write_all(peer_content_response_string.as_bytes())?;
stream.flush()?;
let listener = TcpListener::bind(socket).await?;
loop {
let (mut stream, _) = listener.accept().await?;
let self_clone = self.clone();
tokio::spawn(async move {
let mut buf_reader = BufReader::new(&mut stream);
let received: Vec<u8> = buf_reader.fill_buf().await?.to_vec();
buf_reader.consume(received.len());
let mut incoming_lines = received.split(|x| *x == 10);
if let Some(first_line) = incoming_lines.next() {
if first_line == ALPN_DOCUMENT_TICKET_FETCH {
let remaining_lines: Vec<Vec<u8>> =
incoming_lines.map(|x| x.to_owned()).collect();
let peer_content_request_bytes = remaining_lines.concat();
let peer_content_request_str =
String::from_utf8_lossy(&peer_content_request_bytes).to_string();
let peer_content_request = serde_json::from_str(&peer_content_request_str)?;
let peer_content_response = self_clone
.respond_to_content_request(peer_content_request)
.await?;
let peer_content_response_string =
serde_json::to_string(&peer_content_response)?;
stream
.write_all(peer_content_response_string.as_bytes())
.await?;
stream.flush().await?;
}
}
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
});
}

Ok(())
}

/// Joins a swarm to fetch the latest version of a replica and save it to the local machine.
Expand All @@ -468,7 +480,7 @@ impl OkuFs {
path: Option<PathBuf>,
partial: bool,
verified: bool,
) -> Result<(), Box<dyn Error>> {
) -> Result<(), Box<dyn Error + Send + Sync>> {
let content = ContentRequest::Hash(Hash::new(namespace_id));
let dht = mainline::Dht::default();
let q = Query {
Expand All @@ -481,44 +493,57 @@ impl OkuFs {
let info_hash = to_infohash(q.content);
let peer_content_request = PeerContentRequest { namespace_id, path };
let peer_content_request_string = serde_json::to_string(&peer_content_request)?;
let docs_client = &self.node.docs;

let mut addrs = dht.get_peers(info_hash);
for peer_response in &mut addrs {
let mut stream = TcpStream::connect(peer_response.peer)?;
let mut request = Vec::new();
request.write_all(ALPN_DOCUMENT_TICKET_FETCH)?;
request.write_all(b"\n")?;
request.write_all(peer_content_request_string.as_bytes())?;
request.flush()?;
stream.write_all(&request)?;
stream.flush()?;
let mut response_bytes = Vec::new();
stream.read_to_end(&mut response_bytes)?;
let response: PeerContentResponse =
serde_json::from_str(String::from_utf8_lossy(&response_bytes).as_ref())?;
match response.ticket_response {
PeerTicketResponse::Document(document_ticket) => {
if document_ticket.capability.id() != namespace_id {
continue;
if docs_client.open(namespace_id).await.is_ok() {
break;
}
let peer_content_request_string = peer_content_request_string.clone();
let docs_client = docs_client.clone();
let self_clone = self.clone();
tokio::spawn(async move {
let mut stream = TcpStream::connect(peer_response.peer).await?;
let mut request = Vec::new();
request.write_all(ALPN_DOCUMENT_TICKET_FETCH).await?;
request.write_all(b"\n").await?;
request
.write_all(peer_content_request_string.as_bytes())
.await?;
request.flush().await?;
stream.write_all(&request).await?;
stream.flush().await?;
let mut response_bytes = Vec::new();
stream.read_to_end(&mut response_bytes).await?;
let response: PeerContentResponse =
serde_json::from_str(String::from_utf8_lossy(&response_bytes).as_ref())?;
match response.ticket_response {
PeerTicketResponse::Document(document_ticket) => {
if document_ticket.capability.id() != namespace_id {
return Ok::<(), Box<dyn Error + Send + Sync>>(());
}
// let docs_client = &self.node.docs;
docs_client.import(document_ticket).await?;
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
let docs_client = &self.node.docs;
docs_client.import(document_ticket).await?;
}
PeerTicketResponse::Entries(entry_tickets) => {
let blobs_client = &self.node.blobs;
for blob_ticket in entry_tickets {
let ticket_parts = blob_ticket.into_parts();
let blob_download_request = BlobDownloadRequest {
hash: ticket_parts.1,
format: ticket_parts.2,
peer: ticket_parts.0,
tag: iroh::rpc_protocol::SetTagOption::Auto,
};
blobs_client.download(blob_download_request).await?;
break;
PeerTicketResponse::Entries(entry_tickets) => {
let blobs_client = &self_clone.node.blobs;
for blob_ticket in entry_tickets {
let ticket_parts = blob_ticket.into_parts();
let blob_download_request = BlobDownloadRequest {
hash: ticket_parts.1,
format: ticket_parts.2,
peer: ticket_parts.0,
tag: iroh::rpc_protocol::SetTagOption::Auto,
};
blobs_client.download(blob_download_request).await?;
break;
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
}
}
});
}

Ok(())
Expand All @@ -534,7 +559,7 @@ impl OkuFs {
/// # Returns
///
/// The author credentials.
pub fn load_or_create_author() -> Result<Author, Box<dyn Error>> {
pub fn load_or_create_author() -> Result<Author, Box<dyn Error + Send + Sync>> {
let path = PathBuf::from(FS_PATH).join("author");
let author_file = std::fs::read(path.clone());
match author_file {
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ enum Commands {
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let cli = Cli::parse();
let node = OkuFs::start().await?;
match cli.command {
Expand Down

0 comments on commit 3ce7cd9

Please sign in to comment.