Skip to content

Commit

Permalink
Merge branch 'storage-server-tweaks'
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Jun 22, 2023
2 parents 552e853 + 18441e7 commit fdd4836
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 53 deletions.
53 changes: 51 additions & 2 deletions bridge/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::config::{ConfigKey, ConfigStore};
use crate::{
config::{ConfigKey, ConfigStore},
error::{Error, Result},
};
use ouisync_lib::network::{peer_addr::PeerAddr, Network};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::{io, net::SocketAddr, num::ParseIntError};
use tokio::net;

const BIND_KEY: ConfigKey<Vec<PeerAddr>> =
ConfigKey::new("bind", "Addresses to bind the network listeners to");
Expand Down Expand Up @@ -43,6 +47,8 @@ const LAST_USED_UDP_PORT_COMMENT: &str =
This, in turn, is mainly useful for users who can't or don't want to use UPnP and have to\n\
default to manually setting up port forwarding on their routers.";

pub const DEFAULT_STORAGE_SERVER_PORT: u16 = 20209;

#[derive(Eq, PartialEq, Debug, Serialize, Deserialize)]
pub struct NetworkDefaults {
pub port_forwarding_enabled: bool,
Expand Down Expand Up @@ -156,6 +162,40 @@ pub async fn remove_user_provided_peers(
}
}

/// Add a storage server. This adds it as a user provided peers so we can immediatelly connect to
/// it and don't have to wait for it to be discovered (e.g. on the DHT).
///
/// NOTE: Currently this is not persisted.
pub async fn add_storage_server(network: &Network, host: &str) -> Result<()> {
let (hostname, port) = split_port(host).map_err(|_| {
tracing::error!(host, "invalid storage server host");
Error::InvalidArgument
})?;
let port = port.unwrap_or(DEFAULT_STORAGE_SERVER_PORT);

let addrs = net::lookup_host((hostname, port))
.await
.map(|addrs| addrs.peekable())
.and_then(|mut addrs| {
if addrs.peek().is_some() {
Ok(addrs)
} else {
Err(io::Error::new(io::ErrorKind::Other, "no DNS records found"))
}
})
.map_err(|error| {
tracing::error!(host, ?error, "failed to lookup storage server host");
error
})?;

for addr in addrs {
network.add_user_provided_peer(&PeerAddr::Quic(addr));
tracing::info!(host, %addr, "storage server added");
}

Ok(())
}

/// Utility to help reuse bind ports across network restarts.
struct LastUsedPorts {
quic_v4: u16,
Expand Down Expand Up @@ -263,6 +303,15 @@ impl LastUsedPorts {
}
}

fn split_port(s: &str) -> Result<(&str, Option<u16>), ParseIntError> {
if let Some(index) = s.rfind(':') {
let port = s[index..].parse()?;
Ok((&s[..index], Some(port)))
} else {
Ok((s, None))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
60 changes: 60 additions & 0 deletions bridge/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use crate::{
config::{ConfigError, ConfigKey, ConfigStore},
device_id,
error::Result,
protocol::remote::{Request, Response},
transport::{ClientConfig, RemoteClient},
};
use camino::Utf8PathBuf;
use futures_util::future;
use ouisync_lib::{
crypto::Password, Access, AccessMode, AccessSecrets, LocalSecret, ReopenToken, Repository,
RepositoryParams, ShareToken, StateMonitor, StorageSize,
Expand Down Expand Up @@ -213,3 +216,60 @@ pub async fn get_default_quota(config: &ConfigStore) -> Result<Option<StorageSiz
Err(error) => Err(error.into()),
}
}

/// Mirror the repository to the storage servers
pub async fn mirror(
repository: &Repository,
client_config: ClientConfig,
hosts: &[String],
) -> Result<()> {
let share_token = repository.secrets().with_mode(AccessMode::Blind);

let tasks = hosts.iter().map(|host| {
let client_config = client_config.clone();
let share_token = share_token.clone();

// Stip port, if any.
let host = strip_port(host);

async move {
let client = RemoteClient::connect(host, client_config)
.await
.map_err(|error| {
tracing::error!(host, ?error, "failed to connect to the storage server");
error
})?;

let request = Request::Mirror {
share_token: share_token.into(),
};

match client.invoke(request).await {
Ok(Response::None) => {
tracing::info!(host, "mirror request successfull");
Ok(())
}
Err(error) => {
tracing::error!(host, ?error, "mirror request failed");
Err(error)
}
}
}
});

let results = future::join_all(tasks).await;

if results.iter().any(|result| result.is_ok()) {
Ok(())
} else {
results.into_iter().next().unwrap_or(Ok(()))
}
}

fn strip_port(s: &str) -> &str {
if let Some(index) = s.rfind(':') {
&s[..index]
} else {
s
}
}
4 changes: 2 additions & 2 deletions cli/src/handler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ impl ouisync_bridge::transport::Handler for LocalHandler {

match request {
Request::Start => Err(Error::ForbiddenRequest),
Request::BindRpc { addrs, delay } => Ok(self
Request::BindRpc { addrs } => Ok(self
.state
.servers
.set(self.state.clone(), &addrs, delay)
.set(self.state.clone(), &addrs)
.await?
.into()),
Request::Create {
Expand Down
6 changes: 0 additions & 6 deletions cli/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ pub(crate) enum Request {
/// disables the remote API.
#[arg(value_name = "IP:PORT")]
addrs: Vec<SocketAddr>,

/// Store the bind addresses in the config but delay starting the listeners until the next
/// time the server is started. (This option is useful if you want to specify the RPC
/// bind address before you have the SSL certificate, e.g. when building a Docker image)
#[arg(long)]
delay: bool,
},
/// Create a new repository
Create {
Expand Down
14 changes: 4 additions & 10 deletions cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,13 @@ impl ServerContainer {
&self,
state: Arc<State>,
addrs: &[SocketAddr],
delay_start: bool,
) -> Result<Vec<SocketAddr>, Error> {
let entry = state.config.entry(BIND_RPC_KEY);

if delay_start {
entry.set(addrs).await?;
Ok(Vec::new())
} else {
let (handles, addrs) = start(state, addrs).await?;
*self.handles.lock().unwrap() = handles;
entry.set(&addrs).await?;
Ok(addrs)
}
let (handles, addrs) = start(state, addrs).await?;
*self.handles.lock().unwrap() = handles;
entry.set(&addrs).await?;
Ok(addrs)
}

pub fn close(&self) {
Expand Down
5 changes: 2 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
FROM scratch as runtime

EXPOSE 443/tcp
EXPOSE 2000/tcp 2000/udp
EXPOSE 20209/tcp 20209/udp

ENV PATH=/ \
OUISYNC_CONFIG_DIR=/config \
Expand All @@ -23,8 +23,7 @@ ENV PATH=/ \

COPY --from=builder /usr/bin/ouisync /

RUN ["ouisync", "bind", "quic/0.0.0.0:2000", "quic/[::]:2000", "tcp/[::]:2000"]
RUN ["ouisync", "bind-rpc", "--delay", "[::]:443"]
RUN ["ouisync", "bind", "quic/0.0.0.0:20209", "quic/[::]:20209", "tcp/[::]:20209"]

ENTRYPOINT ["ouisync"]
CMD ["start"]
6 changes: 6 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ one is the binary name. It's recommented to create an alias for this (e.g. in `.

alias ouisync=docker exec ouisync ouisync

## Storage server

To setup a storage server, enable the RPC endpoint:

ouisync bind-rpc 0.0.0.0:443

11 changes: 7 additions & 4 deletions ffi/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,8 @@ impl ouisync_bridge::transport::Handler for Handler {
.await?
.into()
}
Request::RepositoryMirror { repository, host } => {
repository::mirror(&self.state, repository, host)
.await?
.into()
Request::RepositoryMirror { repository } => {
repository::mirror(&self.state, repository).await?.into()
}
Request::ShareTokenMode(token) => share_token::mode(token).into(),
Request::ShareTokenInfoHash(token) => share_token::info_hash(token).into(),
Expand Down Expand Up @@ -313,6 +311,11 @@ impl ouisync_bridge::transport::Handler for Handler {
.await;
().into()
}
Request::NetworkAddStorageServer(host) => {
ouisync_bridge::network::add_storage_server(&self.state.network, &host).await?;
self.state.storage_servers.lock().unwrap().insert(host);
().into()
}
Request::NetworkShutdown => {
self.state.network.shutdown().await;
().into()
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub(crate) enum Request {
RepositorySyncProgress(Handle<RepositoryHolder>),
RepositoryMirror {
repository: Handle<RepositoryHolder>,
host: String,
},
ShareTokenMode(#[serde(with = "as_str")] ShareToken),
ShareTokenInfoHash(#[serde(with = "as_str")] ShareToken),
Expand Down Expand Up @@ -155,6 +154,7 @@ pub(crate) enum Request {
NetworkSetPortForwardingEnabled(bool),
NetworkIsLocalDiscoveryEnabled,
NetworkSetLocalDiscoveryEnabled(bool),
NetworkAddStorageServer(String),
NetworkShutdown,
StateMonitorGet(Vec<MonitorId>),
StateMonitorSubscribe(Vec<MonitorId>),
Expand Down
37 changes: 13 additions & 24 deletions ffi/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ use camino::Utf8PathBuf;
use ouisync_bridge::{
constants::{ENTRY_TYPE_DIRECTORY, ENTRY_TYPE_FILE},
error::Result,
protocol::{
remote::{Request, Response},
Notification,
},
protocol::Notification,
repository,
transport::{NotificationSender, RemoteClient},
transport::NotificationSender,
};
use ouisync_lib::{
network::{self, Registration},
Expand Down Expand Up @@ -344,27 +341,19 @@ pub(crate) async fn sync_progress(
.await?)
}

/// Mirror the repository to the specified server
pub(crate) async fn mirror(
state: &State,
handle: Handle<RepositoryHolder>,
host: String,
) -> Result<()> {
/// Mirror the repository to the storage servers
pub(crate) async fn mirror(state: &State, handle: Handle<RepositoryHolder>) -> Result<()> {
let holder = state.get_repository(handle);
let share_token = holder
.repository
.secrets()
.with_mode(AccessMode::Blind)
.into();

let config = state.get_remote_client_config()?;
let client = RemoteClient::connect(&host, config).await?;
let request = Request::Mirror { share_token };
let response = client.invoke(request).await?;

match response {
Response::None => Ok(()),
}
let hosts: Vec<_> = state
.storage_servers
.lock()
.unwrap()
.iter()
.cloned()
.collect();

ouisync_bridge::repository::mirror(&holder.repository, config, &hosts).await
}

pub(crate) fn entry_type_to_num(entry_type: EntryType) -> u8 {
Expand Down
8 changes: 7 additions & 1 deletion ffi/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use ouisync_lib::{
};
use ouisync_vfs::MultiRepoVFS;
use scoped_task::ScopedJoinHandle;
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use std::{
collections::{BTreeSet, HashMap},
path::PathBuf,
sync::Arc,
};

pub(crate) struct State {
pub root_monitor: StateMonitor,
Expand All @@ -22,6 +26,7 @@ pub(crate) struct State {
repositories: Registry<RepositoryHolder>,
pub files: Registry<FileHolder>,
pub tasks: Registry<ScopedJoinHandle<()>>,
pub storage_servers: BlockingMutex<BTreeSet<String>>,
pub remote_client_config: OnceCell<ClientConfig>,
pub mounter: BlockingMutex<Option<MultiRepoVFS>>,
}
Expand All @@ -45,6 +50,7 @@ impl State {
repositories: Registry::new(),
files: Registry::new(),
tasks: Registry::new(),
storage_servers: BlockingMutex::new(BTreeSet::new()),
remote_client_config: OnceCell::new(),
mounter: BlockingMutex::new(None),
}
Expand Down

0 comments on commit fdd4836

Please sign in to comment.