Skip to content

Commit

Permalink
feat: Begin implementing relays
Browse files Browse the repository at this point in the history
  • Loading branch information
emmyoh committed May 9, 2024
1 parent 3ce7cd9 commit a5649fb
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 8 deletions.
54 changes: 52 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ crate-type=["rlib", "dylib", "staticlib"]

[[bin]]
name = "oku-fs"
path = "src/main.rs"
path = "src/cli.rs"
doc = false
required-features = ["cli"]

[[bin]]
name = "oku-fs-relay"
path = "src/relay.rs"
doc = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ahash = "0.8.11"
bytes = "1.6.0"
chrono = "0.4.37"
clap = { version = "4.5.4", features = ["derive"], optional = true }
Expand All @@ -31,6 +37,7 @@ futures = "0.3.30"
iroh = "0.13.0"
iroh-mainline-content-discovery = "0.5.0"
iroh-pkarr-node-discovery = "0.2.0"
lazy_static = "1.4.0"
mainline = "1.4.0"
miette = { version = "7.2.0", features = ["fancy"] }
path-clean = "1.0.1"
Expand All @@ -41,7 +48,8 @@ serde = "1.0.197"
serde_json = "1.0.116"
thiserror = "1.0.58"
tokio = "1.37.0"
toml = "0.8.12"

[features]
default = []
cli = ["dep:clap"]
cli = ["dep:clap"]
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

A distributed file system for use with the Oku browser.

## Technical Design

Files and directories are stored in replicas implemented as Iroh documents, allowing them to be shared publicly over the mainline DHT or directly between Oku file system nodes.

An Oku file system node consists of three parts:
Expand All @@ -20,4 +22,14 @@ Content discovery occurs over [the mainline DHT](https://en.wikipedia.org/wiki/M
- Port forwarding is necessary to both (1) announce content on the DHT and (2) respond with document tickets when behind NAT.
4. The node uses the swarm ticket to connect to the document swarm and download the document.

As a reminder, 'documents' are in fact entire replicas, with multiple files and directories. Fetching individual files or directories has not yet been implemented.
### NAT

Nodes behind NAT (eg, devices on a home network using IPv4) are unable to listen for incoming connections. This means address and content announcements on the DHT will be meaningless; external nodes will be unable to initiate connections to a local address. Consequently, the node will be unable to serve external requests for content (ie, perform ticket exchanges), as no external nodes will be able to reach it.

To solve this, a relay node is used. These relay nodes:
- Are port-forwarded to route traffic to-and-from the local network.
- Perform DHT announcements on behalf of the connected nodes behind NAT.
- Facilitate ticket exchanges between the appropriate connected nodes and external nodes.

To enable this functionality, relay nodes maintain a list of which replicas are held by which nodes behind NAT.
When an external node requests a replica, said external node connects to the relay node, and the relay node finds the appropriate connected node and begins acting as a middleman during the ticket exchange.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub enum PeerTicketResponse {
Entries(Vec<BlobTicket>),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
/// A request for content from a peer.
pub struct PeerContentRequest {
/// The ID of a requested replica.
Expand Down
9 changes: 9 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,12 @@ pub enum OkuDiscoveryError {
/// Problem announcing content.
ProblemAnnouncingContent(String, String),
}

#[derive(Error, Debug, Diagnostic)]
/// Relay errors.
pub enum OkuRelayError {
#[error("No connected node can satisfy {0}.")]
#[diagnostic(code(relay::cannot_satisfy_request), url(docsrs))]
/// No connected node can satisfy request.
CannotSatisfyRequest(String),
}
90 changes: 88 additions & 2 deletions src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use iroh_mainline_content_discovery::to_infohash;
use iroh_pkarr_node_discovery::PkarrNodeDiscovery;
use path_clean::PathClean;
use rand_core::OsRng;
use std::net::{Ipv4Addr, SocketAddrV4};
use serde::{Deserialize, Serialize};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::{error::Error, path::PathBuf};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
Expand All @@ -32,6 +33,12 @@ pub const FS_PATH: &str = ".oku";
/// The protocol identifier for exchanging document tickets.
pub const ALPN_DOCUMENT_TICKET_FETCH: &[u8] = b"oku/document-ticket/fetch/v0";

/// The protocol identifier for initially connecting to relays.
pub const ALPN_INITIAL_RELAY_CONNECTION: &[u8] = b"oku/relay/connect/v0";

/// The protocol identifier for fetching its list of replicas.
pub const ALPN_RELAY_FETCH: &[u8] = b"oku/relay/fetch/v0";

fn normalise_path(path: PathBuf) -> PathBuf {
PathBuf::from("/").join(path).clean()
}
Expand All @@ -52,6 +59,13 @@ pub fn path_to_entry_key(path: PathBuf) -> Bytes {
path_bytes.into()
}

#[derive(Clone, Debug, Serialize, Deserialize)]
/// The configuration of the file system.
pub struct OkuFsConfig {
/// An optional address to facilitate communication behind NAT.
pub relay_address: Option<String>,
}

/// An instance of an Oku file system.
///
/// The `OkuFs` struct is the primary interface for interacting with an Oku file system.
Expand All @@ -61,6 +75,8 @@ pub struct OkuFs {
node: FsNode,
/// The public key of the author of the file system.
author_id: AuthorId,
/// The configuration of the file system.
config: OkuFsConfig,
}

impl OkuFs {
Expand All @@ -85,7 +101,12 @@ impl OkuFs {
let authors_list: Vec<AuthorId> = authors.map(|author| author.unwrap()).collect().await;
authors_list[0]
};
let oku_fs = OkuFs { node, author_id };
let config = load_or_create_config()?;
let oku_fs = OkuFs {
node,
author_id,
config,
};
let oku_fs_clone = oku_fs.clone();
let node_addr = oku_fs.node.my_addr().await?;
let addr_info = node_addr.info;
Expand All @@ -97,6 +118,16 @@ impl OkuFs {
discovery_service.publish(&addr_info);
let docs_client = &oku_fs.node.docs;
let docs_client = docs_client.clone();
if let Some(relay_address) = oku_fs_clone.config.relay_address {
let oku_fs_clone = oku_fs.clone();
tokio::spawn(async move {
oku_fs_clone
.connect_to_relay(relay_address.to_string())
.await
.unwrap();
});
}
let oku_fs_clone = oku_fs.clone();
tokio::spawn(async move {
oku_fs_clone
.listen_for_document_ticket_fetch_requests()
Expand Down Expand Up @@ -548,6 +579,40 @@ impl OkuFs {

Ok(())
}

/// Connects to a relay to facilitate communication behind NAT.
/// Upon connecting, the file system will send a list of all replicas to the relay. Periodically, the relay will request the list of replicas again using the same connection.
///
/// # Arguments
///
/// * `relay_address` - The address of the relay to connect to.
pub async fn connect_to_relay(
&self,
relay_address: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let relay_addr = relay_address.parse::<SocketAddr>()?;
let mut stream = TcpStream::connect(relay_addr).await?;
let all_replicas = self.list_replicas().await?;
let all_replicas_str = serde_json::to_string(&all_replicas)?;
let mut request = Vec::new();
request.write_all(ALPN_INITIAL_RELAY_CONNECTION).await?;
request.write_all(b"\n").await?;
request.write_all(all_replicas_str.as_bytes()).await?;
request.flush().await?;
stream.write_all(&request).await?;
stream.flush().await?;
loop {
let mut response_bytes = Vec::new();
stream.read_to_end(&mut response_bytes).await?;
if response_bytes == ALPN_RELAY_FETCH {
let all_replicas = self.list_replicas().await?;
let all_replicas_str = serde_json::to_string(&all_replicas)?;
stream.write_all(all_replicas_str.as_bytes()).await?;
stream.flush().await?;
}
}
Ok(())
}
}

/// Imports the author credentials of the file system from disk, or creates new credentials if none exist.
Expand All @@ -573,3 +638,24 @@ pub fn load_or_create_author() -> Result<Author, Box<dyn Error + Send + Sync>> {
}
}
}

/// Loads the configuration of the file system from disk, or creates a new configuration if none exists.
///
/// # Returns
///
/// The configuration of the file system.
pub fn load_or_create_config() -> Result<OkuFsConfig, Box<dyn Error + Send + Sync>> {
let path = PathBuf::from(FS_PATH).join("config");
let config_file_contents = std::fs::read_to_string(path.clone());
match config_file_contents {
Ok(config_file_toml) => Ok(toml::from_str(&config_file_toml)?),
Err(_) => {
let config = OkuFsConfig {
relay_address: None,
};
let config_toml = toml::to_string(&config)?;
std::fs::write(path, config_toml)?;
Ok(config)
}
}
}
Loading

0 comments on commit a5649fb

Please sign in to comment.