Skip to content

Commit

Permalink
rspc over P2P (spacedriveapp#2112)
Browse files Browse the repository at this point in the history
* wip: rspc over p2p

* wip

* rspc over P2P

* Cleanup + error handling

* slight cleanup

* Using Hyper for HTTP streaming + websockets
  • Loading branch information
oscartbeaumont authored Feb 26, 2024
1 parent f7a7b00 commit aa0b4ab
Show file tree
Hide file tree
Showing 16 changed files with 258 additions and 18 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions apps/desktop/src-tauri/src/tauri_plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use axum::{
response::Response,
RequestPartsExt,
};
use http::Method;
use hyper::server::{accept::Accept, conn::AddrIncoming};
use rand::{distributions::Alphanumeric, Rng};
use sd_core::{custom_uri, Node, NodeError};
Expand Down Expand Up @@ -116,7 +117,7 @@ pub async fn sd_server_plugin<R: Runtime>(

#[derive(Deserialize)]
struct QueryParams {
token: String,
token: Option<String>,
}

async fn auth_middleware<B>(
Expand All @@ -128,16 +129,19 @@ async fn auth_middleware<B>(
where
B: Send,
{
let req = if query.token != auth_token {
let req = if query.token.as_ref() != Some(&auth_token) {
let (mut parts, body) = request.into_parts();

let auth: TypedHeader<Authorization<Bearer>> = parts
.extract()
.await
.map_err(|_| StatusCode::UNAUTHORIZED)?;
// We don't check auth for OPTIONS requests cause the CORS middleware will handle it
if parts.method != Method::OPTIONS {
let auth: TypedHeader<Authorization<Bearer>> = parts
.extract()
.await
.map_err(|_| StatusCode::UNAUTHORIZED)?;

if auth.token() != auth_token {
return Err(StatusCode::UNAUTHORIZED);
if auth.token() != auth_token {
return Err(StatusCode::UNAUTHORIZED);
}
}

Request::from_parts(parts, body)
Expand Down
6 changes: 6 additions & 0 deletions apps/desktop/src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ export const platform = {
constructServerUrl(`/file/${libraryId}/${locationLocalId}/${filePathId}`),
getFileUrlByPath: (path) =>
constructServerUrl(`/local-file-by-path/${encodeURIComponent(path)}`),
getRemoteRspcEndpoint: (remote_identity) => ({
url: `${customUriServerUrl?.[0]}/remote/${encodeURIComponent(remote_identity)}/rspc`,
headers: {
authorization: `Bearer ${customUriAuthToken}`
}
}),
openLink: shell.open,
getOs,
openDirectoryPickerDialog: (opts) => {
Expand Down
3 changes: 3 additions & 0 deletions apps/web/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const platform: Platform = {
locationLocalId
)}/${encodeURIComponent(filePathId)}`,
getFileUrlByPath: (path) => `${spacedriveURL}/local-file-by-path/${encodeURIComponent(path)}`,
getRemoteRspcEndpoint: (remote_identity) => ({
url: `${spacedriveURL}/remote/${encodeURIComponent(remote_identity)}/rspc`
}),
openLink: (url) => window.open(url, '_blank')?.focus(),
confirm: (message, cb) => cb(window.confirm(message)),
auth: {
Expand Down
3 changes: 3 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["json", "native-tls-vendored"] }
rmp-serde = { workspace = true }
rspc = { workspace = true, features = [
"axum",
"uuid",
"chrono",
"tracing",
Expand Down Expand Up @@ -127,6 +128,8 @@ aws-config = "1.0.3"
aws-credential-types = "1.0.3"
base91 = "0.1.0"
sd-actors = { version = "0.1.0", path = "../crates/actors" }
tower-service = "0.3.2"
hyper = { version = "=0.14.28", features = ["http1", "server", "client"] }

# Override features of transitive dependencies
[dependencies.openssl]
Expand Down
4 changes: 3 additions & 1 deletion core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ pub(crate) fn mount() -> Arc<Router> {
<sd_prisma::prisma::object::Data as specta::NamedType>::SID,
def,
);
})
});

let r = r
.build(
#[allow(clippy::let_and_return)]
{
Expand Down
33 changes: 32 additions & 1 deletion core/src/custom_uri/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use async_stream::stream;
use axum::{
body::{self, Body, BoxBody, Full, StreamBody},
extract::{self, State},
http::{HeaderValue, Request, Response, StatusCode},
http::{HeaderMap, HeaderValue, Request, Response, StatusCode},
middleware,
response::IntoResponse,
routing::get,
Router,
};
Expand Down Expand Up @@ -320,6 +321,36 @@ pub fn router(node: Arc<Node>) -> Router<()> {
},
),
)
.route(
"/remote/:identity/rspc/*path",
get(
|State(state): State<LocalState>,
extract::Path((identity, rest)): extract::Path<(String, String)>,
mut request: Request<Body>| async move {
let identity = match RemoteIdentity::from_str(&identity) {
Ok(identity) => identity,
Err(err) => {
error!("Error parsing identity '{}': {}", identity, err);
return (StatusCode::BAD_REQUEST, HeaderMap::new(), vec![])
.into_response();
}
};
*request.uri_mut() = format!("/{rest}")
.parse()
.expect("url was validated by Axum");

match operations::remote_rspc(state.node.p2p.p2p.clone(), identity, request)
.await
{
Ok(response) => response.into_response(),
Err(err) => {
error!("Error doing remote rspc query with '{identity}': {err:?}");
(StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new()).into_response()
}
}
},
),
)
.route_layer(middleware::from_fn(cors_middleware))
.with_state({
let file_metadata_cache = Arc::new(Cache::new(150));
Expand Down
16 changes: 13 additions & 3 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,23 @@ impl Node {
init_data.apply(&node.libraries, &node).await?;
}

let router = api::mount();

// Be REALLY careful about ordering here or you'll get unreliable deadlock's!
locations_actor.start(node.clone());
node.libraries.init(&node).await?;
jobs_actor.start(node.clone());
start_p2p(node.clone());

let router = api::mount();
start_p2p(
node.clone(),
router
.clone()
.endpoint({
let node = node.clone();
move |_| node.clone()
})
.axum::<()>()
.into_make_service(),
);

info!("Spacedrive online.");
Ok((node, router))
Expand Down
37 changes: 33 additions & 4 deletions core/src/p2p/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::{
Node,
};

use axum::routing::IntoMakeService;

use sd_p2p2::{
flume::{bounded, Receiver},
Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RemoteIdentity, UnicastStream, P2P,
Expand All @@ -17,12 +19,15 @@ use serde_json::json;
use specta::Type;
use std::{
collections::{HashMap, HashSet},
convert::Infallible,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc, Mutex, PoisonError},
};
use tower_service::Service;
use tracing::error;

use tokio::sync::oneshot;
use tracing::{error, info};
use tracing::info;
use uuid::Uuid;

use super::{P2PEvents, PeerMetadata};
Expand All @@ -44,7 +49,13 @@ impl P2PManager {
pub async fn new(
node_config: Arc<config::Manager>,
libraries: Arc<crate::library::Libraries>,
) -> Result<(Arc<P2PManager>, impl FnOnce(Arc<Node>)), String> {
) -> Result<
(
Arc<P2PManager>,
impl FnOnce(Arc<Node>, IntoMakeService<axum::Router<()>>),
),
String,
> {
let (tx, rx) = bounded(25);
let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx);
let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?;
Expand All @@ -70,8 +81,8 @@ impl P2PManager {
this.p2p.listeners()
);

Ok((this.clone(), |node| {
tokio::spawn(start(this, node, rx));
Ok((this.clone(), |node, router| {
tokio::spawn(start(this, node, rx, router));
}))
}

Expand Down Expand Up @@ -220,10 +231,13 @@ async fn start(
this: Arc<P2PManager>,
node: Arc<Node>,
rx: Receiver<UnicastStream>,
mut service: IntoMakeService<axum::Router<()>>,
) -> Result<(), ()> {
while let Ok(mut stream) = rx.recv_async().await {
let this = this.clone();
let node = node.clone();
let mut service = unwrap_infallible(service.call(()).await);

tokio::spawn(async move {
println!("APPLICATION GOT STREAM: {:?}", stream); // TODO

Expand Down Expand Up @@ -286,6 +300,14 @@ async fn start(

error!("Failed to handle file request");
}
Header::Http => {
let remote = stream.remote_identity();
let Err(err) = operations::rspc::receiver(stream, &mut service).await else {
return;
};

error!("Failed to handling rspc request with '{remote}': {err:?}");
}
};
});
}
Expand All @@ -309,3 +331,10 @@ pub fn into_listener2(l: &[Listener]) -> Vec<Listener2> {
})
.collect()
}

fn unwrap_infallible<T>(result: Result<T, Infallible>) -> T {
match result {
Ok(value) => value,
Err(err) => match err {},
}
}
2 changes: 2 additions & 0 deletions core/src/p2p/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod ping;
pub mod request_file;
pub mod rspc;
pub mod spacedrop;

pub use request_file::request_file;
pub use rspc::remote_rspc;
pub use spacedrop::spacedrop;
53 changes: 53 additions & 0 deletions core/src/p2p/operations/rspc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::{error::Error, sync::Arc};

use axum::{body::Body, http, Router};
use hyper::{server::conn::Http, Response};
use sd_p2p2::{RemoteIdentity, UnicastStream, P2P};
use tokio::io::AsyncWriteExt;
use tracing::debug;

use crate::p2p::Header;

/// Transfer an rspc query to a remote node.
#[allow(unused)]
pub async fn remote_rspc(
p2p: Arc<P2P>,
identity: RemoteIdentity,
request: http::Request<axum::body::Body>,
) -> Result<Response<Body>, Box<dyn Error>> {
let peer = p2p
.peers()
.get(&identity)
.ok_or("Peer not found, has it been discovered?")?
.clone();
let mut stream = peer.new_stream().await?;

stream.write_all(&Header::Http.to_bytes()).await?;

let (mut sender, conn) = hyper::client::conn::handshake(stream).await?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection error: {:?}", err);
}
});

sender.send_request(request).await.map_err(Into::into)
}

pub(crate) async fn receiver(
stream: UnicastStream,
service: &mut Router,
) -> Result<(), Box<dyn Error>> {
debug!(
"Received http request from peer '{}'",
stream.remote_identity(),
);

Http::new()
.http1_only(true)
.http1_keep_alive(true)
.serve_connection(stream, service)
.with_upgrades()
.await
.map_err(Into::into)
}
4 changes: 4 additions & 0 deletions core/src/p2p/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub enum Header {
Spacedrop(SpaceblockRequests),
Sync(Uuid),
File(HeaderFile),
// A HTTP server used for rspc requests and streaming files
Http,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -86,6 +88,7 @@ impl Header {
i => return Err(HeaderError::HeaderFileDiscriminatorInvalid(i)),
},
})),
5 => Ok(Self::Http),
d => Err(HeaderError::DiscriminatorInvalid(d)),
}
}
Expand Down Expand Up @@ -116,6 +119,7 @@ impl Header {
buf.extend_from_slice(&range.to_bytes());
buf
}
Self::Http => vec![5],
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ export default function DebugSection() {
<Icon component={ShareNetwork} />
P2P
</SidebarLink>
<SidebarLink to="debug/p2p-rspc">
<Icon component={ShareNetwork} />
P2P (rspc)
</SidebarLink>
</div>
</Section>
);
Expand Down
3 changes: 2 additions & 1 deletion interface/app/$libraryId/debug/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ export const debugRoutes = [
{ path: 'cloud', lazy: () => import('./cloud') },
{ path: 'sync', lazy: () => import('./sync') },
{ path: 'actors', lazy: () => import('./actors') },
{ path: 'p2p', lazy: () => import('./p2p') }
{ path: 'p2p', lazy: () => import('./p2p') },
{ path: 'p2p-rspc', lazy: () => import('./p2p-rspc') }
] satisfies RouteObject[];
Loading

0 comments on commit aa0b4ab

Please sign in to comment.