Skip to content

Commit

Permalink
feat(discovery): Exchanging node tickets
Browse files Browse the repository at this point in the history
  • Loading branch information
emmyoh committed Apr 5, 2024
1 parent c685ac5 commit 6060631
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 40 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ bincode = "1.3.3"
bytes = "1.6.0"
chrono = "0.4.37"
derive_more = "0.99.17"
flume = "0.11.0"
futures = "0.3.30"
hex = "0.4.3"
iroh = "0.13.0"
iroh-mainline-content-discovery = "0.5.0"
iroh-pkarr-node-discovery = "0.2.0"
mainline = "1.4.0"
miette = { version = "7.2.0", features = ["fancy"] }
path-clean = "1.0.1"
postcard = "1.0.8"
quic-rpc = "0.7.0"
rand_core = "0.6.4"
serde = "1.0.197"
Expand Down
49 changes: 23 additions & 26 deletions src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::error::OkuDiscoveryError;
use futures::StreamExt;

use iroh::{
bytes::{Hash, HashAndFormat},
sync::NamespaceId,
net::NodeId,
ticket::BlobTicket,
};
use iroh_mainline_content_discovery::protocol::{Query, QueryFlags};
use iroh_mainline_content_discovery::to_infohash;
use iroh_mainline_content_discovery::UdpDiscovery;

use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::{error::Error, str::FromStr};

Expand Down Expand Up @@ -37,6 +38,13 @@ impl ContentRequest {
},
}
}
pub fn hash(&self) -> Hash {
match self {
ContentRequest::Hash(hash) => *hash,
ContentRequest::HashAndFormat(haf) => haf.hash,
ContentRequest::Ticket(ticket) => ticket.hash(),
}
}
}

impl FromStr for ContentRequest {
Expand All @@ -54,12 +62,13 @@ impl FromStr for ContentRequest {
}
}

async fn query_dht(
pub async fn query_dht(
content: ContentRequest,
partial: bool,
verified: bool,
udp_port: Option<u16>,
) -> Result<(), Box<dyn Error>> {
let _providers: Vec<NodeId> = Vec::new();
let bind_addr = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
udp_port.unwrap_or_default(),
Expand All @@ -75,28 +84,16 @@ async fn query_dht(
};
println!("content corresponds to infohash {}", to_infohash(q.content));

let mut stream = discovery.query_dht(dht, q).await?;
while let Some(announce) = stream.next().await {
if announce.verify().is_ok() {
println!("found verified provider {}", announce.host);
} else {
println!("got wrong signed announce!");
}
}
Ok(())
}
let _stream = discovery.query_dht(dht, q).await?;
// while let Some(announce) = stream.next().await {
// if announce.verify().is_ok() {
// println!("found verified provider {}", announce.host);
// providers.push(announce.host);
// } else {
// println!("got wrong signed announce!");
// }
// }
// Ok(providers)

pub async fn query_fs_entry(
id: NamespaceId,
partial: bool,
verified: bool,
udp_port: Option<u16>,
) -> Result<(), Box<dyn Error>> {
query_dht(
ContentRequest::Hash(Hash::new(id)),
partial,
verified,
udp_port,
)
.await
Ok(())
}
9 changes: 9 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,13 @@ pub enum OkuDiscoveryError {
#[error("Invalid hash and format.")]
#[diagnostic(code(discovery::invalid_hash_and_format), url(docsrs))]
InvalidHashAndFormat,
#[error("Unable to discover node address for node ID.")]
#[diagnostic(code(discovery::node_address_discovery_failed), url(docsrs))]
NodeAddressDiscoveryFailed,
#[error("Unable to find nodes able to satisfy query")]
#[diagnostic(code(discovery::no_nodes_found), url(docsrs))]
NoNodesFound,
#[error("Unsupported protocol identifier: {0}")]
#[diagnostic(code(discovery::unsupported_alpn), url(docsrs))]
UnsupportedALPN(String),
}
218 changes: 204 additions & 14 deletions src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
use crate::error::OkuDiscoveryError;
use crate::{discovery::ContentRequest, error::OkuFsError};
use bytes::Bytes;
use futures::StreamExt;
use iroh::base::ticket::Ticket;
use iroh::net::magic_endpoint::accept_conn;
use iroh::ticket::DocTicket;
use iroh::{
bytes::Hash, node::FsNode, rpc_protocol::ShareMode, sync::{
store::{fs::StoreInstance, Store},
Author, AuthorId, NamespaceId, NamespaceSecret, Replica,
}
bytes::Hash,
net::{
discovery::{ConcurrentDiscovery, Discovery},
MagicEndpoint,
},
node::FsNode,
rpc_protocol::ShareMode,
sync::{Author, AuthorId, NamespaceId},
};
use iroh_mainline_content_discovery::protocol::{Query, QueryFlags};
use iroh_mainline_content_discovery::to_infohash;
use iroh_mainline_content_discovery::UdpDiscovery;
use iroh_pkarr_node_discovery::PkarrNodeDiscovery;
use path_clean::PathClean;
use rand_core::OsRng;
use sha3::{Digest, Sha3_256};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::{error::Error, path::PathBuf};

use crate::error::OkuFsError;

const FS_PATH: &str = ".oku";
pub const FS_PATH: &str = ".oku";
pub const ALPN_DOCUMENT_TICKET_FETCH: &[u8] = b"oku/document-ticket/fetch/v0";

fn normalise_path(path: PathBuf) -> PathBuf {
PathBuf::from("/").join(path).clean()
Expand Down Expand Up @@ -45,9 +58,29 @@ impl OkuFs {
let authors_list: Vec<AuthorId> = authors.map(|author| author.unwrap()).collect().await;
authors_list[0]
};
let node_addr = node.my_addr().await?;
let addr_info = node_addr.info;
let magic_endpoint = node.magic_endpoint();
let secret_key = magic_endpoint.secret_key();
let mut discovery_service = ConcurrentDiscovery::new();
let pkarr = PkarrNodeDiscovery::builder().secret_key(secret_key).build();
discovery_service.add(pkarr);
discovery_service.publish(&addr_info);
Ok(OkuFs { node, author_id })
}

pub async fn create_discovery_service(&self) -> Result<ConcurrentDiscovery, Box<dyn Error>> {
let node_addr = self.node.my_addr().await?;
let addr_info = node_addr.info;
let magic_endpoint = self.node.magic_endpoint();
let secret_key = magic_endpoint.secret_key();
let mut discovery_service = ConcurrentDiscovery::new();
let pkarr = PkarrNodeDiscovery::builder().secret_key(secret_key).build();
discovery_service.add(pkarr);
discovery_service.publish(&addr_info);
Ok(discovery_service)
}

pub fn shutdown(self) {
self.node.shutdown();
}
Expand Down Expand Up @@ -85,7 +118,11 @@ impl OkuFs {
Ok(entry_hash)
}

pub async fn delete_file(&self, namespace: NamespaceId, path: PathBuf) -> Result<usize, Box<dyn Error>> {
pub async fn delete_file(
&self,
namespace: NamespaceId,
path: PathBuf,
) -> Result<usize, Box<dyn Error>> {
let file_key = path_to_entry_key(path);
let docs_client = &self.node.docs;
let document = docs_client
Expand All @@ -96,34 +133,187 @@ impl OkuFs {
Ok(entries_deleted)
}

pub async fn read_file(&self, namespace: NamespaceId, path: PathBuf) -> Result<Bytes, Box<dyn Error>> {
pub async fn read_file(
&self,
namespace: NamespaceId,
path: PathBuf,
) -> Result<Bytes, Box<dyn Error>> {
let file_key = path_to_entry_key(path);
let docs_client = &self.node.docs;
let document = docs_client
.open(namespace)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
let entry = document.get_exact(self.author_id, file_key, false).await?.ok_or(OkuFsError::FsEntryNotFound)?;
let entry = document
.get_exact(self.author_id, file_key, false)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
Ok(entry.content_bytes(self.node.client()).await?)
}

pub async fn move_file(&self, namespace: NamespaceId, from: PathBuf, to: PathBuf) -> Result<(Hash, usize), Box<dyn Error>> {
pub async fn move_file(
&self,
namespace: NamespaceId,
from: PathBuf,
to: PathBuf,
) -> Result<(Hash, usize), Box<dyn Error>> {
let data = self.read_file(namespace, from.clone()).await?;
let hash = self.create_or_modify_file(namespace, to.clone(), data).await?;
let hash = self
.create_or_modify_file(namespace, to.clone(), data)
.await?;
let entries_deleted = self.delete_file(namespace, from).await?;
Ok((hash, entries_deleted))
}

pub async fn delete_directory(&self, namespace: NamespaceId, path: PathBuf) -> Result<usize, Box<dyn Error>> {
pub async fn delete_directory(
&self,
namespace: NamespaceId,
path: PathBuf,
) -> Result<usize, Box<dyn Error>> {
let path = normalise_path(path).join(""); // Ensure path ends with a slash
let docs_client = &self.node.docs;
let document = docs_client
.open(namespace)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
let entries_deleted = document.del(self.author_id, format!("{}", path.display())).await?;
let entries_deleted = document
.del(self.author_id, format!("{}", path.display()))
.await?;
Ok(entries_deleted)
}

pub async fn listen_for_document_ticket_fetch_requests(&self) -> Result<(), Box<dyn Error>> {
let mut alpns: Vec<Vec<u8>> = Vec::new();
alpns.push(ALPN_DOCUMENT_TICKET_FETCH.to_vec());
let secret_key = self.node.magic_endpoint().secret_key();
let endpoint = MagicEndpoint::builder()
.alpns(alpns)
.secret_key(secret_key.clone())
.discovery(Box::new(self.create_discovery_service().await?))
.bind(0)
.await?;
while let Some(conn) = endpoint.clone().accept().await {
let (peer_id, alpn, conn) = accept_conn(conn).await?;
println!(
"new connection from {peer_id} with ALPN {alpn} (coming from {})",
conn.remote_address()
);
match alpn.as_bytes() {
ALPN_DOCUMENT_TICKET_FETCH => {
let (mut send, mut recv) = conn.accept_bi().await?;
let namespace_id_bytes: &[u8] = &recv.read_to_end(32).await?;
let namespace_id_bytes: &[u8; 32] = namespace_id_bytes.try_into()?;
let namespace = NamespaceId::from(namespace_id_bytes);
let docs_client = &self.node.docs;
let document = docs_client.open(namespace).await?;
if let Some(document) = document {
let ticket = document.share(ShareMode::Read).await?;
send.write_all(&postcard::to_stdvec(&ticket.to_bytes())?)
.await?;
}
}
_ => Err(OkuDiscoveryError::UnsupportedALPN(alpn.to_string()))?,
}
}
Ok(())
}

pub async fn get_external_replica(
&self,
namespace: NamespaceId,
partial: bool,
verified: bool,
udp_port: Option<u16>,
) -> Result<(), Box<dyn Error>> {
// let discovery_items_stream = self
// .discovery_service
// .resolve(self.node.magic_endpoint().clone(), node_id);
// return match discovery_items_stream {
// None => None,
// Some(discovery_items) => {
// pin_mut!(discovery_items);
// let node_addrs: Vec<NodeAddr> = discovery_items
// .map(|item| NodeAddr {
// node_id,
// info: item.unwrap().addr_info,
// })
// .collect()
// .await;
// Some(node_addrs)
// }
// };
let content = ContentRequest::Hash(Hash::new(namespace.clone()));
let secret_key = self.node.magic_endpoint().secret_key();
let endpoint = MagicEndpoint::builder()
.alpns(vec![])
.secret_key(secret_key.clone())
.discovery(Box::new(self.create_discovery_service().await?))
.bind(0)
.await?;
let bind_addr = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
udp_port.unwrap_or_default(),
));
let discovery = UdpDiscovery::new(bind_addr).await?;
let dht = mainline::Dht::default();
let q = Query {
content: content.hash_and_format(),
flags: QueryFlags {
complete: !partial,
verified: verified,
},
};
println!("content corresponds to infohash {}", to_infohash(q.content));

let stream = discovery.query_dht(dht, q).await?;
let connections = stream
.map(move |announce| {
println!("got announce {:?}", announce);
let endpoint = endpoint.clone();
async move {
endpoint
.connect_by_node_id(&announce.host, ALPN_DOCUMENT_TICKET_FETCH)
.await
}
})
.buffer_unordered(4)
.filter_map(|x| async {
match x {
Ok(x) => Some(x),
Err(e) => {
eprintln!("error connecting to node: {:?}", e);
None
}
}
});
tokio::pin!(connections);
let connection = connections
.next()
.await
.ok_or(OkuDiscoveryError::NoNodesFound)?;
let (mut send, mut recv) = connection.open_bi().await?;
send.write_all(&postcard::to_stdvec(namespace.as_bytes())?)
.await?;
let ticket = DocTicket::from_bytes(&recv.read_to_end(256).await?)?;
let docs_client = &self.node.docs;
docs_client.import(ticket).await?;
Ok(())
}

// pub async fn get_external_replica(&self, namespace: NamespaceId) -> Result<(), Box<dyn Error>> {
// // let providers: Vec<NodeId> =
// // discovery::query_dht(ContentRequest::Hash(Hash::new(namespace)), true, true, None)
// // .await?;
// // for provider in providers {
// // let node_addrs = self.discover_node(provider).await;
// // if let Some(node_addrs) = node_addrs {
// // for node_addr in node_addrs {
// // self.node.inner.sync
// // }
// // }
// // }
// Ok(())
// }
}

/// Imports the author credentials of the file system from disk, or creates new credentials if none exist.
Expand Down

0 comments on commit 6060631

Please sign in to comment.