diff --git a/kinode/src/http/server.rs b/kinode/src/http/server.rs index 0a7a2b9d1..7481d9791 100644 --- a/kinode/src/http/server.rs +++ b/kinode/src/http/server.rs @@ -1,21 +1,28 @@ -use crate::http::server_types::*; -use crate::http::utils::*; +use crate::http::server_types::{ + HttpResponse, HttpServerAction, HttpServerError, HttpServerRequest, IncomingHttpRequest, + MessageType, RpcResponseBody, WsMessageType, +}; +use crate::http::utils; use crate::keygen; -use anyhow::Result; use base64::{engine::general_purpose::STANDARD as base64_standard, Engine}; use dashmap::DashMap; use futures::{SinkExt, StreamExt}; use http::uri::Authority; -use lib::types::core::*; +use lib::types::core::{ + Address, KernelCommand, KernelMessage, LazyLoadBlob, LoginInfo, Message, MessageReceiver, + MessageSender, PrintSender, Printout, ProcessId, Request, Response, HTTP_SERVER_PROCESS_ID, +}; use route_recognizer::Router; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::RwLock; -use warp::http::{header::HeaderValue, StatusCode}; -use warp::ws::{WebSocket, Ws}; -use warp::{Filter, Reply}; +use warp::{ + http::{header::HeaderValue, StatusCode}, + ws::{WebSocket, Ws}, + Filter, Reply, +}; #[cfg(not(feature = "simulation-mode"))] const HTTP_SELF_IMPOSED_TIMEOUT: u64 = 15; @@ -180,39 +187,37 @@ pub async fn http_server( mut recv_in_server: MessageReceiver, send_to_loop: MessageSender, print_tx: PrintSender, -) -> Result<()> { - let our_name = Arc::new(our_name); - let encoded_keyfile = Arc::new(encoded_keyfile); - let jwt_secret_bytes = Arc::new(jwt_secret_bytes); +) -> anyhow::Result<()> { let http_response_senders: HttpResponseSenders = Arc::new(DashMap::new()); let ws_senders: WebSocketSenders = Arc::new(DashMap::new()); - let path = format!("/rpc:distro:sys/message"); - // add RPC path let mut bindings_map: Router = Router::new(); - let rpc_bound_path = BoundPath { - app: Some(ProcessId::new(Some("rpc"), "distro", "sys")), - path: path.clone(), - secure_subdomain: None, - authenticated: false, - local_only: true, - static_content: None, - }; - bindings_map.add(&path, rpc_bound_path); - let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map)); - // ws path bindings + // add local-only RPC path + bindings_map.add( + "/rpc:distro:sys/message", + BoundPath { + app: Some(ProcessId::new(Some("rpc"), "distro", "sys")), + path: "/rpc:distro:sys/message".to_string(), + secure_subdomain: None, + authenticated: false, + local_only: true, + static_content: None, + }, + ); + + let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map)); let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(Router::new())); tokio::spawn(serve( - our_name.clone(), + Arc::new(our_name), our_port, http_response_senders.clone(), path_bindings.clone(), ws_path_bindings.clone(), ws_senders.clone(), - encoded_keyfile.clone(), - jwt_secret_bytes.clone(), + Arc::new(encoded_keyfile), + Arc::new(jwt_secret_bytes), send_to_loop.clone(), print_tx.clone(), )); @@ -246,17 +251,14 @@ async fn serve( send_to_loop: MessageSender, print_tx: PrintSender, ) { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("http_server: running on port {our_port}"), - }) + Printout::new(0, format!("http_server: running on port {our_port}")) + .send(&print_tx) .await; // filter to receive websockets - let cloned_msg_tx = send_to_loop.clone(); let cloned_our = our.clone(); let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); + let cloned_msg_tx = send_to_loop.clone(); let cloned_print_tx = print_tx.clone(); let ws_route = warp::ws() .and(warp::addr::remote()) @@ -394,22 +396,19 @@ async fn ws_handler( send_to_loop: MessageSender, print_tx: PrintSender, ) -> Result { - let original_path = normalize_path(path.as_str()).to_string(); - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!("http_server: got ws request for {original_path}"), - }) + let original_path = utils::normalize_path(path.as_str()); + Printout::new(2, format!("http_server: ws request for {original_path}")) + .send(&print_tx) .await; - let serialized_headers = serialize_headers(&headers); - let ws_path_bindings = ws_path_bindings.read().await; + let serialized_headers = utils::serialize_headers(&headers); - let Ok(route) = ws_path_bindings.recognize(&original_path) else { + let ws_path_bindings = ws_path_bindings.read().await; + let Ok(route) = ws_path_bindings.recognize(original_path) else { return Err(warp::reject::not_found()); }; - let bound_path = route.handler(); + let Some(app) = bound_path.app.clone() else { return Err(warp::reject::not_found()); }; @@ -420,14 +419,6 @@ async fn ws_handler( }; if let Some(ref subdomain) = bound_path.secure_subdomain { - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!( - "http_server: ws request for {original_path} bound by subdomain {subdomain}" - ), - }) - .await; // assert that host matches what this app wants it to be let host = match host { Some(host) => host, @@ -436,12 +427,12 @@ async fn ws_handler( // parse out subdomain from host (there can only be one) let request_subdomain = host.host().split('.').next().unwrap_or(""); if request_subdomain != subdomain - || !auth_cookie_valid(&our, Some(&app), auth_token, &jwt_secret_bytes) + || !utils::auth_cookie_valid(&our, Some(&app), auth_token, &jwt_secret_bytes) { return Err(warp::reject::not_found()); } } else { - if !auth_cookie_valid(&our, None, auth_token, &jwt_secret_bytes) { + if !utils::auth_cookie_valid(&our, None, auth_token, &jwt_secret_bytes) { return Err(warp::reject::not_found()); } } @@ -501,25 +492,30 @@ async fn http_handler( print_tx: PrintSender, login_html: Arc, ) -> Result { - // trim trailing "/" - let original_path = normalize_path(path.as_str()); - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!("http_server: got request for path {original_path}"), - }) + let original_path = utils::normalize_path(path.as_str()); + let base_path = original_path.split('/').skip(1).next().unwrap_or(""); + Printout::new(2, format!("http_server: request for {original_path}")) + .send(&print_tx) .await; + let id: u64 = rand::random(); - let serialized_headers = serialize_headers(&headers); - let path_bindings = path_bindings.read().await; + let serialized_headers = utils::serialize_headers(&headers); - let Ok(route) = path_bindings.recognize(&original_path) else { - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!("http_server: no route found for {original_path}"), - }) - .await; + let path_bindings = path_bindings.read().await; + let route = if let Ok(route) = path_bindings.recognize(&original_path) { + route + } else if let Ok(base_route) = path_bindings.recognize(base_path) { + // if the specific path isn't found, try the base path which should + // be just the process ID. use the base path configuration to handle + // paths that have not been specifically bound by that process. + base_route + } else { + Printout::new( + 2, + format!("http_server: no route found for {original_path}"), + ) + .send(&print_tx) + .await; return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response()); }; let bound_path = route.handler(); @@ -532,14 +528,6 @@ async fn http_handler( if bound_path.authenticated { if let Some(ref subdomain) = bound_path.secure_subdomain { - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!( - "http_server: request for {original_path} bound by subdomain {subdomain}" - ), - }) - .await; let request_subdomain = host.host().split('.').next().unwrap_or(""); // assert that host matches what this app wants it to be if request_subdomain.is_empty() { @@ -579,7 +567,7 @@ async fn http_handler( .body(vec![]) .into_response()); } - if !auth_cookie_valid( + if !utils::auth_cookie_valid( &our, Some(&app), serialized_headers.get("cookie").unwrap_or(&"".to_string()), @@ -592,7 +580,7 @@ async fn http_handler( .into_response()); } } else { - if !auth_cookie_valid( + if !utils::auth_cookie_valid( &our, None, serialized_headers.get("cookie").unwrap_or(&"".to_string()), @@ -694,29 +682,14 @@ async fn http_handler( drop(path_bindings); if is_fire_and_forget { - match send_to_loop.send(message).await { - Ok(_) => {} - Err(_) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR) - .into_response(), - ); - } - } + message.send(&send_to_loop).await; return Ok(warp::reply::with_status(vec![], StatusCode::OK).into_response()); } let (response_sender, response_receiver) = tokio::sync::oneshot::channel(); http_response_senders.insert(id, (original_path.to_string(), response_sender)); - match send_to_loop.send(message).await { - Ok(_) => {} - Err(_) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(), - ); - } - } + message.send(&send_to_loop).await; let timeout_duration = tokio::time::Duration::from_secs(HTTP_SELF_IMPOSED_TIMEOUT); let result = tokio::time::timeout(timeout_duration, response_receiver).await; @@ -743,7 +716,7 @@ async fn http_handler( // Merge the deserialized headers into the existing headers let existing_headers = response.headers_mut(); - for (header_name, header_value) in deserialize_headers(http_response.headers).iter() { + for (header_name, header_value) in utils::deserialize_headers(http_response.headers).iter() { if header_name == "set-cookie" || header_name == "Set-Cookie" { if let Ok(cookie) = header_value.to_str() { let cookie_headers: Vec<&str> = cookie @@ -768,7 +741,7 @@ async fn handle_rpc_message( body: warp::hyper::body::Bytes, print_tx: PrintSender, ) -> Result<(KernelMessage, bool), StatusCode> { - let Ok(rpc_message) = serde_json::from_slice::(&body) else { + let Ok(rpc_message) = serde_json::from_slice::(&body) else { return Err(StatusCode::BAD_REQUEST); }; @@ -776,12 +749,12 @@ async fn handle_rpc_message( return Err(StatusCode::BAD_REQUEST); }; - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!("http_server: passing on RPC message to {target_process}"), - }) - .await; + Printout::new( + 2, + format!("http_server: passing on RPC message to {target_process}"), + ) + .send(&print_tx) + .await; let blob: Option = match rpc_message.data { None => None, @@ -801,19 +774,12 @@ async fn handle_rpc_message( Ok(( KernelMessage { id, - source: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: rpc_message.node.unwrap_or(our.to_string()), - process: target_process, - }, + source: Address::new(&*our, HTTP_SERVER_PROCESS_ID.clone()), + target: Address::new(rpc_message.node.unwrap_or(our.to_string()), target_process), rsvp, message: Message::Request(Request { inherit: false, - expects_response: rpc_message.expects_response.clone(), - //expects_response: Some(15), // NB: no effect on runtime + expects_response: rpc_message.expects_response, body: match rpc_message.body { Some(body_string) => body_string.into_bytes(), None => Vec::new(), @@ -836,14 +802,8 @@ fn make_websocket_message( ) -> Option { Some(KernelMessage { id: rand::random(), - source: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.to_string(), - process: app, - }, + source: Address::new(&our, HTTP_SERVER_PROCESS_ID.clone()), + target: Address::new(&our, app), rsvp: None, message: Message::Request(Request { inherit: false, @@ -937,14 +897,8 @@ fn make_ext_websocket_message( Some(KernelMessage { id, - source: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.to_string(), - process: app, - }, + source: Address::new(&our, HTTP_SERVER_PROCESS_ID.clone()), + target: Address::new(&our, app), rsvp: None, message, lazy_load_blob: blob, @@ -968,35 +922,28 @@ async fn maintain_websocket( let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); ws_senders.insert(channel_id, (app.clone(), ws_sender)); - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!("http_server: new websocket connection to {app} with id {channel_id}"), - }) - .await; - - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.clone().to_string(), - process: app.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&HttpServerRequest::WebSocketOpen { path, channel_id }) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) + Printout::new( + 2, + format!("http_server: new websocket connection to {app} with id {channel_id}"), + ) + .send(&print_tx) + .await; + + KernelMessage::builder() + .id(rand::random()) + .source(Address::new(&*our, HTTP_SERVER_PROCESS_ID.clone())) + .target(Address::new(&*our, app.clone())) + .message(Message::Request(Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&HttpServerRequest::WebSocketOpen { path, channel_id }) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) .await; let make_ws_message = if extension { @@ -1049,12 +996,12 @@ async fn maintain_websocket( } } } - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!("http_server: websocket connection {channel_id} closed"), - }) - .await; + Printout::new( + 2, + format!("http_server: websocket connection {channel_id} closed"), + ) + .send(&print_tx) + .await; let stream = write_stream.reunite(read_stream).unwrap(); let _ = stream.close().await; } @@ -1066,34 +1013,20 @@ async fn websocket_close( send_to_loop: &MessageSender, ) { ws_senders.remove(&channel_id); - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: "our".to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: "our".to_string(), - process, - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&HttpServerRequest::WebSocketClose(channel_id)).unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: Some(LazyLoadBlob { - mime: None, - bytes: serde_json::to_vec(&RpcResponseBody { - body: Vec::new(), - lazy_load_blob: None, - }) - .unwrap(), - }), - }) + KernelMessage::builder() + .id(rand::random()) + .source(Address::new("our", HTTP_SERVER_PROCESS_ID.clone())) + .target(Address::new("our", process)) + .message(Message::Request(Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&HttpServerRequest::WebSocketClose(channel_id)).unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(send_to_loop) .await; } @@ -1180,25 +1113,24 @@ async fn handle_app_message( local_only, cache, } => { - let path = format_path_with_process(&km.source.process, &path); + let path = utils::format_path_with_process(&km.source.process, &path); let mut path_bindings = path_bindings.write().await; - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!( - "http: binding {path}, {}, {}, {}", - if authenticated { - "authenticated" - } else { - "unauthenticated" - }, - if local_only { "local only" } else { "open" }, - if cache { "cached" } else { "dynamic" }, - ), - }) - .await; + Printout::new( + 2, + format!( + "http: binding {path}, {}, {}, {}", + if authenticated { + "authenticated" + } else { + "unauthenticated" + }, + if local_only { "local only" } else { "open" }, + if cache { "cached" } else { "dynamic" }, + ), + ) + .send(&print_tx) + .await; if !cache { - // trim trailing "/" path_bindings.add( &path, BoundPath { @@ -1221,7 +1153,6 @@ async fn handle_app_message( .await; return; }; - // trim trailing "/" path_bindings.add( &path, BoundPath { @@ -1236,18 +1167,18 @@ async fn handle_app_message( } } HttpServerAction::SecureBind { path, cache } => { - let path = format_path_with_process(&km.source.process, &path); - let subdomain = generate_secure_subdomain(&km.source.process); + let path = utils::format_path_with_process(&km.source.process, &path); + let subdomain = utils::generate_secure_subdomain(&km.source.process); let mut path_bindings = path_bindings.write().await; - let _ = print_tx - .send(Printout { - verbosity: 2, - content: format!( - "http: binding subdomain {subdomain} with path {path}, {}", - if cache { "cached" } else { "dynamic" }, - ), - }) - .await; + Printout::new( + 2, + format!( + "http: binding subdomain {subdomain} with path {path}, {}", + if cache { "cached" } else { "dynamic" }, + ), + ) + .send(&print_tx) + .await; if !cache { path_bindings.add( &path, @@ -1271,7 +1202,6 @@ async fn handle_app_message( .await; return; }; - // trim trailing "/" path_bindings.add( &path, BoundPath { @@ -1286,7 +1216,7 @@ async fn handle_app_message( } } HttpServerAction::Unbind { path } => { - let path = format_path_with_process(&km.source.process, &path); + let path = utils::format_path_with_process(&km.source.process, &path); let mut path_bindings = path_bindings.write().await; path_bindings.add( &path, @@ -1306,7 +1236,7 @@ async fn handle_app_message( encrypted, extension, } => { - let path = format_path_with_process(&km.source.process, &path); + let path = utils::format_path_with_process(&km.source.process, &path); let mut ws_path_bindings = ws_path_bindings.write().await; ws_path_bindings.add( &path, @@ -1324,8 +1254,8 @@ async fn handle_app_message( encrypted, extension, } => { - let path = format_path_with_process(&km.source.process, &path); - let subdomain = generate_secure_subdomain(&km.source.process); + let path = utils::format_path_with_process(&km.source.process, &path); + let subdomain = utils::generate_secure_subdomain(&km.source.process); let mut ws_path_bindings = ws_path_bindings.write().await; ws_path_bindings.add( &path, @@ -1339,7 +1269,7 @@ async fn handle_app_message( ); } HttpServerAction::WebSocketUnbind { mut path } => { - let path = format_path_with_process(&km.source.process, &path); + let path = utils::format_path_with_process(&km.source.process, &path); let mut ws_path_bindings = ws_path_bindings.write().await; ws_path_bindings.add( &path, @@ -1448,25 +1378,21 @@ pub async fn send_action_response( send_to_loop: &MessageSender, result: Result<(), HttpServerError>, ) { - let _ = send_to_loop - .send(KernelMessage { - id, - source: Address { - node: "our".to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), + KernelMessage::builder() + .id(id) + .source(Address::new("our", HTTP_SERVER_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&result).unwrap(), + metadata: None, + capabilities: vec![], }, - target, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&result).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) + None, + ))) + .build() + .unwrap() + .send(send_to_loop) .await; } diff --git a/kinode/src/http/utils.rs b/kinode/src/http/utils.rs index 0dfb86ae8..ec3090e3e 100644 --- a/kinode/src/http/utils.rs +++ b/kinode/src/http/utils.rs @@ -1,13 +1,12 @@ use hmac::{Hmac, Mac}; use jwt::VerifyWithKey; +use lib::{core::ProcessId, types::http_server}; use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::collections::HashMap; use tokio::net::TcpListener; use warp::http::{header::HeaderName, header::HeaderValue, HeaderMap}; -use lib::{core::ProcessId, types::http_server::*}; - #[derive(Serialize, Deserialize)] pub struct RpcMessage { pub node: Option, @@ -27,7 +26,7 @@ pub fn _verify_auth_token(auth_token: &str, jwt_secret: &[u8]) -> Result = auth_token.verify_with_key(&secret); + let claims: Result = auth_token.verify_with_key(&secret); match claims { Ok(data) => Ok(data.username), @@ -66,7 +65,7 @@ pub fn auth_cookie_valid( return false; }; - let claims: Result = auth_token.verify_with_key(&secret); + let claims: Result = auth_token.verify_with_key(&secret); match claims { Ok(data) => data.username == our_node && data.subdomain == subdomain.map(|s| s.to_string()),