From 606063190d9057bd82e9c360599ffeaf1a152033 Mon Sep 17 00:00:00 2001 From: Emil Sayahi <97276123+emmyoh@users.noreply.github.com> Date: Fri, 5 Apr 2024 16:37:51 -0400 Subject: [PATCH] feat(discovery): Exchanging node tickets --- Cargo.toml | 3 + src/discovery.rs | 49 +++++------ src/error.rs | 9 ++ src/fs.rs | 218 ++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 239 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bb6094c..f6e5829 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,13 +11,16 @@ bincode = "1.3.3" bytes = "1.6.0" chrono = "0.4.37" derive_more = "0.99.17" +flume = "0.11.0" futures = "0.3.30" hex = "0.4.3" iroh = "0.13.0" iroh-mainline-content-discovery = "0.5.0" +iroh-pkarr-node-discovery = "0.2.0" mainline = "1.4.0" miette = { version = "7.2.0", features = ["fancy"] } path-clean = "1.0.1" +postcard = "1.0.8" quic-rpc = "0.7.0" rand_core = "0.6.4" serde = "1.0.197" diff --git a/src/discovery.rs b/src/discovery.rs index b09eb42..22cbbc6 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,13 +1,14 @@ use crate::error::OkuDiscoveryError; -use futures::StreamExt; + use iroh::{ bytes::{Hash, HashAndFormat}, - sync::NamespaceId, + net::NodeId, ticket::BlobTicket, }; use iroh_mainline_content_discovery::protocol::{Query, QueryFlags}; use iroh_mainline_content_discovery::to_infohash; use iroh_mainline_content_discovery::UdpDiscovery; + use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::{error::Error, str::FromStr}; @@ -37,6 +38,13 @@ impl ContentRequest { }, } } + pub fn hash(&self) -> Hash { + match self { + ContentRequest::Hash(hash) => *hash, + ContentRequest::HashAndFormat(haf) => haf.hash, + ContentRequest::Ticket(ticket) => ticket.hash(), + } + } } impl FromStr for ContentRequest { @@ -54,12 +62,13 @@ impl FromStr for ContentRequest { } } -async fn query_dht( +pub async fn query_dht( content: ContentRequest, partial: bool, verified: bool, udp_port: Option, ) -> Result<(), Box> { + let _providers: Vec = Vec::new(); let bind_addr = SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::UNSPECIFIED, udp_port.unwrap_or_default(), @@ -75,28 +84,16 @@ async fn query_dht( }; println!("content corresponds to infohash {}", to_infohash(q.content)); - let mut stream = discovery.query_dht(dht, q).await?; - while let Some(announce) = stream.next().await { - if announce.verify().is_ok() { - println!("found verified provider {}", announce.host); - } else { - println!("got wrong signed announce!"); - } - } - Ok(()) -} + let _stream = discovery.query_dht(dht, q).await?; + // while let Some(announce) = stream.next().await { + // if announce.verify().is_ok() { + // println!("found verified provider {}", announce.host); + // providers.push(announce.host); + // } else { + // println!("got wrong signed announce!"); + // } + // } + // Ok(providers) -pub async fn query_fs_entry( - id: NamespaceId, - partial: bool, - verified: bool, - udp_port: Option, -) -> Result<(), Box> { - query_dht( - ContentRequest::Hash(Hash::new(id)), - partial, - verified, - udp_port, - ) - .await + Ok(()) } diff --git a/src/error.rs b/src/error.rs index 1f4e5c9..a892b5f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -47,4 +47,13 @@ pub enum OkuDiscoveryError { #[error("Invalid hash and format.")] #[diagnostic(code(discovery::invalid_hash_and_format), url(docsrs))] InvalidHashAndFormat, + #[error("Unable to discover node address for node ID.")] + #[diagnostic(code(discovery::node_address_discovery_failed), url(docsrs))] + NodeAddressDiscoveryFailed, + #[error("Unable to find nodes able to satisfy query")] + #[diagnostic(code(discovery::no_nodes_found), url(docsrs))] + NoNodesFound, + #[error("Unsupported protocol identifier: {0}")] + #[diagnostic(code(discovery::unsupported_alpn), url(docsrs))] + UnsupportedALPN(String), } diff --git a/src/fs.rs b/src/fs.rs index 55bc995..d592d74 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,19 +1,32 @@ +use crate::error::OkuDiscoveryError; +use crate::{discovery::ContentRequest, error::OkuFsError}; use bytes::Bytes; use futures::StreamExt; +use iroh::base::ticket::Ticket; +use iroh::net::magic_endpoint::accept_conn; +use iroh::ticket::DocTicket; use iroh::{ - bytes::Hash, node::FsNode, rpc_protocol::ShareMode, sync::{ - store::{fs::StoreInstance, Store}, - Author, AuthorId, NamespaceId, NamespaceSecret, Replica, - } + bytes::Hash, + net::{ + discovery::{ConcurrentDiscovery, Discovery}, + MagicEndpoint, + }, + node::FsNode, + rpc_protocol::ShareMode, + sync::{Author, AuthorId, NamespaceId}, }; +use iroh_mainline_content_discovery::protocol::{Query, QueryFlags}; +use iroh_mainline_content_discovery::to_infohash; +use iroh_mainline_content_discovery::UdpDiscovery; +use iroh_pkarr_node_discovery::PkarrNodeDiscovery; use path_clean::PathClean; use rand_core::OsRng; use sha3::{Digest, Sha3_256}; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::{error::Error, path::PathBuf}; -use crate::error::OkuFsError; - -const FS_PATH: &str = ".oku"; +pub const FS_PATH: &str = ".oku"; +pub const ALPN_DOCUMENT_TICKET_FETCH: &[u8] = b"oku/document-ticket/fetch/v0"; fn normalise_path(path: PathBuf) -> PathBuf { PathBuf::from("/").join(path).clean() @@ -45,9 +58,29 @@ impl OkuFs { let authors_list: Vec = authors.map(|author| author.unwrap()).collect().await; authors_list[0] }; + let node_addr = node.my_addr().await?; + let addr_info = node_addr.info; + let magic_endpoint = node.magic_endpoint(); + let secret_key = magic_endpoint.secret_key(); + let mut discovery_service = ConcurrentDiscovery::new(); + let pkarr = PkarrNodeDiscovery::builder().secret_key(secret_key).build(); + discovery_service.add(pkarr); + discovery_service.publish(&addr_info); Ok(OkuFs { node, author_id }) } + pub async fn create_discovery_service(&self) -> Result> { + let node_addr = self.node.my_addr().await?; + let addr_info = node_addr.info; + let magic_endpoint = self.node.magic_endpoint(); + let secret_key = magic_endpoint.secret_key(); + let mut discovery_service = ConcurrentDiscovery::new(); + let pkarr = PkarrNodeDiscovery::builder().secret_key(secret_key).build(); + discovery_service.add(pkarr); + discovery_service.publish(&addr_info); + Ok(discovery_service) + } + pub fn shutdown(self) { self.node.shutdown(); } @@ -85,7 +118,11 @@ impl OkuFs { Ok(entry_hash) } - pub async fn delete_file(&self, namespace: NamespaceId, path: PathBuf) -> Result> { + pub async fn delete_file( + &self, + namespace: NamespaceId, + path: PathBuf, + ) -> Result> { let file_key = path_to_entry_key(path); let docs_client = &self.node.docs; let document = docs_client @@ -96,34 +133,187 @@ impl OkuFs { Ok(entries_deleted) } - pub async fn read_file(&self, namespace: NamespaceId, path: PathBuf) -> Result> { + pub async fn read_file( + &self, + namespace: NamespaceId, + path: PathBuf, + ) -> Result> { let file_key = path_to_entry_key(path); let docs_client = &self.node.docs; let document = docs_client .open(namespace) .await? .ok_or(OkuFsError::FsEntryNotFound)?; - let entry = document.get_exact(self.author_id, file_key, false).await?.ok_or(OkuFsError::FsEntryNotFound)?; + let entry = document + .get_exact(self.author_id, file_key, false) + .await? + .ok_or(OkuFsError::FsEntryNotFound)?; Ok(entry.content_bytes(self.node.client()).await?) } - pub async fn move_file(&self, namespace: NamespaceId, from: PathBuf, to: PathBuf) -> Result<(Hash, usize), Box> { + pub async fn move_file( + &self, + namespace: NamespaceId, + from: PathBuf, + to: PathBuf, + ) -> Result<(Hash, usize), Box> { let data = self.read_file(namespace, from.clone()).await?; - let hash = self.create_or_modify_file(namespace, to.clone(), data).await?; + let hash = self + .create_or_modify_file(namespace, to.clone(), data) + .await?; let entries_deleted = self.delete_file(namespace, from).await?; Ok((hash, entries_deleted)) } - pub async fn delete_directory(&self, namespace: NamespaceId, path: PathBuf) -> Result> { + pub async fn delete_directory( + &self, + namespace: NamespaceId, + path: PathBuf, + ) -> Result> { let path = normalise_path(path).join(""); // Ensure path ends with a slash let docs_client = &self.node.docs; let document = docs_client .open(namespace) .await? .ok_or(OkuFsError::FsEntryNotFound)?; - let entries_deleted = document.del(self.author_id, format!("{}", path.display())).await?; + let entries_deleted = document + .del(self.author_id, format!("{}", path.display())) + .await?; Ok(entries_deleted) } + + pub async fn listen_for_document_ticket_fetch_requests(&self) -> Result<(), Box> { + let mut alpns: Vec> = Vec::new(); + alpns.push(ALPN_DOCUMENT_TICKET_FETCH.to_vec()); + let secret_key = self.node.magic_endpoint().secret_key(); + let endpoint = MagicEndpoint::builder() + .alpns(alpns) + .secret_key(secret_key.clone()) + .discovery(Box::new(self.create_discovery_service().await?)) + .bind(0) + .await?; + while let Some(conn) = endpoint.clone().accept().await { + let (peer_id, alpn, conn) = accept_conn(conn).await?; + println!( + "new connection from {peer_id} with ALPN {alpn} (coming from {})", + conn.remote_address() + ); + match alpn.as_bytes() { + ALPN_DOCUMENT_TICKET_FETCH => { + let (mut send, mut recv) = conn.accept_bi().await?; + let namespace_id_bytes: &[u8] = &recv.read_to_end(32).await?; + let namespace_id_bytes: &[u8; 32] = namespace_id_bytes.try_into()?; + let namespace = NamespaceId::from(namespace_id_bytes); + let docs_client = &self.node.docs; + let document = docs_client.open(namespace).await?; + if let Some(document) = document { + let ticket = document.share(ShareMode::Read).await?; + send.write_all(&postcard::to_stdvec(&ticket.to_bytes())?) + .await?; + } + } + _ => Err(OkuDiscoveryError::UnsupportedALPN(alpn.to_string()))?, + } + } + Ok(()) + } + + pub async fn get_external_replica( + &self, + namespace: NamespaceId, + partial: bool, + verified: bool, + udp_port: Option, + ) -> Result<(), Box> { + // let discovery_items_stream = self + // .discovery_service + // .resolve(self.node.magic_endpoint().clone(), node_id); + // return match discovery_items_stream { + // None => None, + // Some(discovery_items) => { + // pin_mut!(discovery_items); + // let node_addrs: Vec = discovery_items + // .map(|item| NodeAddr { + // node_id, + // info: item.unwrap().addr_info, + // }) + // .collect() + // .await; + // Some(node_addrs) + // } + // }; + let content = ContentRequest::Hash(Hash::new(namespace.clone())); + let secret_key = self.node.magic_endpoint().secret_key(); + let endpoint = MagicEndpoint::builder() + .alpns(vec![]) + .secret_key(secret_key.clone()) + .discovery(Box::new(self.create_discovery_service().await?)) + .bind(0) + .await?; + let bind_addr = SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::UNSPECIFIED, + udp_port.unwrap_or_default(), + )); + let discovery = UdpDiscovery::new(bind_addr).await?; + let dht = mainline::Dht::default(); + let q = Query { + content: content.hash_and_format(), + flags: QueryFlags { + complete: !partial, + verified: verified, + }, + }; + println!("content corresponds to infohash {}", to_infohash(q.content)); + + let stream = discovery.query_dht(dht, q).await?; + let connections = stream + .map(move |announce| { + println!("got announce {:?}", announce); + let endpoint = endpoint.clone(); + async move { + endpoint + .connect_by_node_id(&announce.host, ALPN_DOCUMENT_TICKET_FETCH) + .await + } + }) + .buffer_unordered(4) + .filter_map(|x| async { + match x { + Ok(x) => Some(x), + Err(e) => { + eprintln!("error connecting to node: {:?}", e); + None + } + } + }); + tokio::pin!(connections); + let connection = connections + .next() + .await + .ok_or(OkuDiscoveryError::NoNodesFound)?; + let (mut send, mut recv) = connection.open_bi().await?; + send.write_all(&postcard::to_stdvec(namespace.as_bytes())?) + .await?; + let ticket = DocTicket::from_bytes(&recv.read_to_end(256).await?)?; + let docs_client = &self.node.docs; + docs_client.import(ticket).await?; + Ok(()) + } + + // pub async fn get_external_replica(&self, namespace: NamespaceId) -> Result<(), Box> { + // // let providers: Vec = + // // discovery::query_dht(ContentRequest::Hash(Hash::new(namespace)), true, true, None) + // // .await?; + // // for provider in providers { + // // let node_addrs = self.discover_node(provider).await; + // // if let Some(node_addrs) = node_addrs { + // // for node_addr in node_addrs { + // // self.node.inner.sync + // // } + // // } + // // } + // Ok(()) + // } } /// Imports the author credentials of the file system from disk, or creates new credentials if none exist.