From 4d45cda518fb7448085ae6c24787d8e699ba0847 Mon Sep 17 00:00:00 2001 From: Emil Sayahi <97276123+emmyoh@users.noreply.github.com> Date: Mon, 6 May 2024 00:36:22 -0400 Subject: [PATCH] feat: Retrieve replicas by ID --- Cargo.lock | 6 +- Cargo.toml | 2 + src/discovery.rs | 22 +++- src/fs.rs | 286 ++++++++++++++++++++++++++++++++++------------- src/main.rs | 9 +- 5 files changed, 242 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b98fbb..24f37a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2516,8 +2516,10 @@ dependencies = [ "path-clean", "postcard", "quic-rpc", + "quinn", "rand_core", "serde", + "serde_json", "sha3", "thiserror", "tokio", @@ -3697,9 +3699,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.115" +version = "1.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd" +checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 2e8d9fa..93c7a56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,8 +29,10 @@ miette = { version = "7.2.0", features = ["fancy"] } path-clean = "1.0.1" postcard = "1.0.8" quic-rpc = "0.7.0" +quinn = "0.10.2" rand_core = "0.6.4" serde = "1.0.197" +serde_json = "1.0.116" sha3 = "0.10.8" thiserror = "1.0.58" tokio = "1.37.0" diff --git a/src/discovery.rs b/src/discovery.rs index 9922252..c7867d9 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -3,9 +3,11 @@ use futures::StreamExt; use iroh::{ bytes::{Hash, HashAndFormat}, sync::NamespaceId, - ticket::BlobTicket, + ticket::{BlobTicket, DocTicket}, }; use iroh_mainline_content_discovery::announce_dht; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; use std::{collections::BTreeSet, error::Error, str::FromStr, time::Duration}; /// The delay between republishing content to the mainline DHT. @@ -149,3 +151,21 @@ impl FromStr for ContentRequest { // Ok(()) // } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PeerTicketResponse { + Document(DocTicket), + Entries(Vec), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerContentRequest { + pub namespace_id: NamespaceId, + pub path: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerContentResponse { + pub ticket_response: PeerTicketResponse, + pub content_size: u64, +} diff --git a/src/fs.rs b/src/fs.rs index 3e8d9ba..580ee90 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,30 +1,28 @@ -use crate::discovery::DISCOVERY_PORT; use crate::discovery::{announce_replica, INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY}; -use crate::error::OkuDiscoveryError; +use crate::discovery::{ + PeerContentRequest, PeerContentResponse, PeerTicketResponse, DISCOVERY_PORT, +}; use crate::{discovery::ContentRequest, error::OkuFsError}; use bytes::Bytes; use futures::{pin_mut, StreamExt}; -use iroh::base::ticket::Ticket; use iroh::client::Entry; -use iroh::net::magic_endpoint::accept_conn; -use iroh::ticket::DocTicket; +use iroh::rpc_protocol::BlobDownloadRequest; +use iroh::ticket::BlobTicket; use iroh::{ bytes::Hash, - net::{ - discovery::{ConcurrentDiscovery, Discovery}, - MagicEndpoint, - }, + net::discovery::{ConcurrentDiscovery, Discovery}, node::FsNode, rpc_protocol::ShareMode, sync::{Author, AuthorId, NamespaceId}, }; use iroh_mainline_content_discovery::protocol::{Query, QueryFlags}; use iroh_mainline_content_discovery::to_infohash; -use iroh_mainline_content_discovery::UdpDiscovery; use iroh_pkarr_node_discovery::PkarrNodeDiscovery; use path_clean::PathClean; use rand_core::OsRng; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::io::{BufRead, BufReader, Read, Write}; +use std::net::TcpListener; +use std::net::{Ipv4Addr, SocketAddrV4, TcpStream}; use std::{error::Error, path::PathBuf}; /// The path on disk where the file system is stored. @@ -60,6 +58,7 @@ pub fn path_to_entry_key(path: PathBuf) -> Bytes { /// An instance of an Oku file system. /// /// The `OkuFs` struct is the primary interface for interacting with an Oku file system. +#[derive(Clone, Debug)] pub struct OkuFs { /// An Iroh node responsible for storing replicas on the local machine, as well as joining swarms to fetch replicas from other nodes. node: FsNode, @@ -90,7 +89,9 @@ impl OkuFs { authors_list[0] }; let oku_fs = OkuFs { node, author_id }; + let oku_fs_clone = oku_fs.clone(); let node_addr = oku_fs.node.my_addr().await?; + println!("{:#?}", node_addr); let addr_info = node_addr.info; let magic_endpoint = oku_fs.node.magic_endpoint(); let secret_key = magic_endpoint.secret_key(); @@ -100,6 +101,12 @@ impl OkuFs { discovery_service.publish(&addr_info); let docs_client = &oku_fs.node.docs; let docs_client = docs_client.clone(); + tokio::spawn(async move { + oku_fs_clone + .listen_for_document_ticket_fetch_requests() + .await + .unwrap() + }); tokio::spawn(async move { loop { tokio::time::sleep(INITIAL_PUBLISH_DELAY).await; @@ -342,41 +349,115 @@ impl OkuFs { Ok(entries_deleted) } + pub async fn respond_to_content_request( + &self, + request: PeerContentRequest, + ) -> Result> { + let docs_client = &self.node.docs; + let document = docs_client + .open(request.namespace_id) + .await? + .ok_or(OkuFsError::FsEntryNotFound)?; + match request.path { + None => { + let document_ticket = document.share(ShareMode::Read).await?; + let query = iroh::sync::store::Query::single_latest_per_key().build(); + let entries = document.get_many(query).await?; + pin_mut!(entries); + let file_sizes: Vec = entries + .map(|entry| entry.unwrap().content_len()) + .collect() + .await; + let content_length = file_sizes.iter().sum(); + Ok(PeerContentResponse { + ticket_response: PeerTicketResponse::Document(document_ticket), + content_size: content_length, + }) + } + Some(blob_path) => { + let blobs_client = &self.node.blobs; + let entry_prefix = path_to_entry_key(blob_path); + let query = iroh::sync::store::Query::single_latest_per_key() + .key_prefix(entry_prefix) + .build(); + let entries = document.get_many(query).await?; + pin_mut!(entries); + let entry_hashes_and_sizes: Vec<(Hash, u64)> = entries + .map(|entry| { + ( + entry.as_ref().unwrap().content_hash(), + entry.unwrap().content_len(), + ) + }) + .collect() + .await; + let entry_tickets: Vec = + futures::future::try_join_all(entry_hashes_and_sizes.iter().map(|entry| { + blobs_client.share( + entry.0, + iroh::bytes::BlobFormat::Raw, + iroh::client::ShareTicketOptions::RelayAndAddresses, + ) + })) + .await?; + let content_length = entry_hashes_and_sizes + .iter() + .map(|entry| entry.1) + .collect::>() + .iter() + .sum(); + Ok(PeerContentResponse { + ticket_response: PeerTicketResponse::Entries(entry_tickets), + content_size: content_length, + }) + } + } + } + /// Handles incoming requests for document tickets. /// This function listens for incoming connections from peers and responds to requests for document tickets. pub async fn listen_for_document_ticket_fetch_requests(&self) -> Result<(), Box> { - let mut alpns: Vec> = Vec::new(); - alpns.push(ALPN_DOCUMENT_TICKET_FETCH.to_vec()); - let secret_key = self.node.magic_endpoint().secret_key(); - let endpoint = MagicEndpoint::builder() - .alpns(alpns) - .secret_key(secret_key.clone()) - .discovery(Box::new(self.create_discovery_service().await?)) - .bind(0) - .await?; - while let Some(conn) = endpoint.clone().accept().await { - let (peer_id, alpn, conn) = accept_conn(conn).await?; + let socket = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DISCOVERY_PORT); + let listener = TcpListener::bind(socket)?; + for stream in listener.incoming() { + let mut stream = stream?; + let mut buf_reader = BufReader::new(&mut stream); + let received: Vec = buf_reader.fill_buf()?.to_vec(); + buf_reader.consume(received.len()); + let mut incoming_lines = received.split(|x| *x == 10); println!( - "new connection from {peer_id} with ALPN {alpn} (coming from {})", - conn.remote_address() + "Received: {:#?}", + String::from_utf8_lossy(&received).to_string() ); - match alpn.as_bytes() { - ALPN_DOCUMENT_TICKET_FETCH => { - let (mut send, mut recv) = conn.accept_bi().await?; - let namespace_id_bytes: &[u8] = &recv.read_to_end(32).await?; - let namespace_id_bytes: &[u8; 32] = namespace_id_bytes.try_into()?; - let namespace = NamespaceId::from(namespace_id_bytes); - let docs_client = &self.node.docs; - let document = docs_client.open(namespace).await?; - if let Some(document) = document { - let ticket = document.share(ShareMode::Read).await?; - send.write_all(&postcard::to_stdvec(&ticket.to_bytes())?) - .await?; - } + if let Some(first_line) = incoming_lines.next() { + println!( + "First: {:#?}", + String::from_utf8_lossy(&first_line).to_string() + ); + if first_line == ALPN_DOCUMENT_TICKET_FETCH { + let remaining_lines: Vec> = + incoming_lines.map(|x| x.to_owned()).collect(); + let peer_content_request_bytes = remaining_lines.concat(); + let peer_content_request_str = + String::from_utf8_lossy(&peer_content_request_bytes).to_string(); + println!( + "Second: {:#?}", + String::from_utf8_lossy(&peer_content_request_bytes).to_string() + ); + let peer_content_request = serde_json::from_str(&peer_content_request_str)?; + println!("Request: {:#?}", peer_content_request); + let peer_content_response = self + .respond_to_content_request(peer_content_request) + .await?; + println!("Response: {:#?}", peer_content_response); + let peer_content_response_string = + serde_json::to_string(&peer_content_response)?; + stream.write_all(&peer_content_response_string.as_bytes())?; + stream.flush()?; } - _ => Err(OkuDiscoveryError::UnsupportedALPN(alpn.to_string()))?, } } + Ok(()) } @@ -392,6 +473,7 @@ impl OkuFs { pub async fn get_external_replica( &self, namespace_id: NamespaceId, + path: Option, partial: bool, verified: bool, ) -> Result<(), Box> { @@ -413,15 +495,15 @@ impl OkuFs { // } // }; let content = ContentRequest::Hash(Hash::new(namespace_id)); - let secret_key = self.node.magic_endpoint().secret_key(); - let endpoint = MagicEndpoint::builder() - .alpns(vec![]) - .secret_key(secret_key.clone()) - .discovery(Box::new(self.create_discovery_service().await?)) - .bind(0) - .await?; - let bind_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DISCOVERY_PORT)); - let discovery = UdpDiscovery::new(bind_addr).await?; + // let secret_key = self.node.magic_endpoint().secret_key(); + // let endpoint = MagicEndpoint::builder() + // .alpns(vec![]) + // .secret_key(secret_key.clone()) + // .discovery(Box::new(self.create_discovery_service().await?)) + // .bind(0) + // .await?; + // let bind_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DISCOVERY_PORT)); + // let discovery = UdpDiscovery::new(bind_addr).await?; let dht = mainline::Dht::default(); let q = Query { content: content.hash_and_format(), @@ -430,40 +512,90 @@ impl OkuFs { verified, }, }; - println!("content corresponds to infohash {}", to_infohash(q.content)); + let info_hash = to_infohash(q.content); + println!("content corresponds to infohash {}", info_hash); + let peer_content_request = PeerContentRequest { + namespace_id: namespace_id, + path: path, + }; + let peer_content_request_string = serde_json::to_string(&peer_content_request)?; + println!( + "peer_content_request_string: {:#?}", + peer_content_request_string + ); - let stream = discovery.query_dht(dht, q).await?; - let connections = stream - .map(move |announce| { - println!("got announce {:?}", announce); - let endpoint = endpoint.clone(); - async move { - endpoint - .connect_by_node_id(&announce.host, ALPN_DOCUMENT_TICKET_FETCH) - .await + let mut addrs = dht.get_peers(info_hash); + for peer_response in &mut addrs { + println!( + "Got peer: {:?} | from: {:?}", + peer_response.peer, peer_response.from + ); + // println!("{:#?}", peer_response); + // let client_endpoint = quinn::Endpoint::client("0.0.0.0:0".parse()?)?; + // let client_config = { + // let alpn = vec![ALPN_DOCUMENT_TICKET_FETCH.to_vec()]; + // let tls_client_config = iroh::net::tls::make_client_config( + // endpoint.secret_key(), + // Some(self.node.node_id()), + // alpn, + // false, + // )?; + // let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); + // let mut transport_config = quinn::TransportConfig::default(); + // transport_config.keep_alive_interval(Some(Duration::from_secs(1))); + // client_config.transport_config(Arc::new(transport_config)); + // client_config + // }; + // let connection = client_endpoint.connect_with(client_config, peer_response.peer, "localhost")?.await?; + // // let connection = client_endpoint + // // .connect(peer_response.peer, std::str::from_utf8(ALPN_DOCUMENT_TICKET_FETCH)?) + // // ? + // // .await + // // ?; + // println!("[client] connected: addr={}", connection.remote_address()); + // let (mut send, mut recv) = connection.open_bi().await?; + // send.write_all(&postcard::to_stdvec(namespace_id.as_bytes())?) + // .await?; + let mut stream = TcpStream::connect(peer_response.peer)?; + let mut request = Vec::new(); + request.write_all(ALPN_DOCUMENT_TICKET_FETCH)?; + request.write_all(b"\n")?; + request.write_all(&peer_content_request_string.as_bytes())?; + request.flush()?; + // stream.write_all(ALPN_DOCUMENT_TICKET_FETCH)?; + // stream.write_all(&peer_content_request_bytes)?; + stream.write_all(&request)?; + stream.flush()?; + let mut response_bytes = Vec::new(); + stream.read_to_end(&mut response_bytes)?; + let response: PeerContentResponse = + serde_json::from_str(&String::from_utf8_lossy(&response_bytes).to_string())?; + println!("Response: {:#?}", response); + match response.ticket_response { + PeerTicketResponse::Document(document_ticket) => { + if document_ticket.capability.id() != namespace_id { + continue; + } + let docs_client = &self.node.docs; + docs_client.import(document_ticket).await?; } - }) - .buffer_unordered(4) - .filter_map(|x| async { - match x { - Ok(x) => Some(x), - Err(e) => { - eprintln!("error connecting to node: {:?}", e); - None + PeerTicketResponse::Entries(entry_tickets) => { + let blobs_client = &self.node.blobs; + for blob_ticket in entry_tickets { + let ticket_parts = blob_ticket.into_parts(); + let blob_download_request = BlobDownloadRequest { + hash: ticket_parts.1, + format: ticket_parts.2, + peer: ticket_parts.0, + tag: iroh::rpc_protocol::SetTagOption::Auto, + }; + blobs_client.download(blob_download_request).await?; + break; } } - }); - tokio::pin!(connections); - let connection = connections - .next() - .await - .ok_or(OkuDiscoveryError::NoNodesFound)?; - let (mut send, mut recv) = connection.open_bi().await?; - send.write_all(&postcard::to_stdvec(namespace_id.as_bytes())?) - .await?; - let ticket = DocTicket::from_bytes(&recv.read_to_end(256).await?)?; - let docs_client = &self.node.docs; - docs_client.import(ticket).await?; + } + } + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 7cca31e..80374d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,8 +57,10 @@ enum Commands { new_path: PathBuf, }, GetReplica { - #[arg(value_name = "REPLICA_ID")] + #[arg(short, long, value_name = "REPLICA_ID")] replica_id: NamespaceId, + #[arg(short, long, value_name = "PATH", default_missing_value = None)] + path: Option, }, } @@ -117,8 +119,9 @@ async fn main() -> Result<(), Box> { .await?; println!("Moved file from {:?} to {:?}", old_path, new_path); } - Some(Commands::GetReplica { replica_id }) => { - node.get_external_replica(replica_id, true, true).await?; + Some(Commands::GetReplica { replica_id, path }) => { + node.get_external_replica(replica_id, path, true, true) + .await?; let files = node.list_files(replica_id).await?; for file in files { println!("{:#?}", file);