diff --git a/rust-native/Cargo.lock b/rust-native/Cargo.lock index 70e1e32..d640ca7 100644 --- a/rust-native/Cargo.lock +++ b/rust-native/Cargo.lock @@ -331,6 +331,7 @@ dependencies = [ "napi-build", "napi-derive", "parking_lot", + "rustc-hash", "serde", "serde_json", "sha2", diff --git a/rust-native/Cargo.toml b/rust-native/Cargo.toml index 9f4f445..296ef3b 100644 --- a/rust-native/Cargo.toml +++ b/rust-native/Cargo.toml @@ -14,6 +14,7 @@ httparse = "1.9" itoa = "1.0" json5 = "0.4" memchr = "2.7" +rustc-hash = "2" getrandom = "0.2" hmac = "0.12" monoio = { version = "0.2", features = ["sync", "legacy"] } diff --git a/rust-native/src/lib.rs b/rust-native/src/lib.rs index 6123b22..7c82b9a 100644 --- a/rust-native/src/lib.rs +++ b/rust-native/src/lib.rs @@ -15,6 +15,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::rc::Rc; use std::sync::{mpsc, Arc, Mutex}; use url::form_urlencoded; @@ -518,6 +519,14 @@ async fn run_server( shutdown_flag: Arc, session_store: Option>, ) -> Result<()> { + // Wrap Arc in Rc for cheap per-connection cloning within this single-threaded + // worker — avoids atomic ref-count operations on every accepted connection. + let router: Rc> = Rc::new(router); + let dispatcher: Rc> = Rc::new(dispatcher); + let server_config: Rc> = Rc::new(server_config); + let session_store: Option>> = + session_store.map(Rc::new); + let active_connections: std::cell::Cell = std::cell::Cell::new(0); loop { @@ -541,9 +550,9 @@ async fn run_server( eprintln!("[http-native] failed to enable TCP_NODELAY: {error}"); } - let router = Arc::clone(&router); - let dispatcher = Arc::clone(&dispatcher); - let server_config = Arc::clone(&server_config); + let router = Rc::clone(&router); + let dispatcher = Rc::clone(&dispatcher); + let server_config = Rc::clone(&server_config); let session_store = session_store.clone(); active_connections.set(active_connections.get() + 1); @@ -599,16 +608,15 @@ use std::time::Duration; const TIMEOUT_HEADER_READ: Duration = Duration::from_secs(30); const TIMEOUT_IDLE_KEEPALIVE: Duration = Duration::from_secs(120); const TIMEOUT_BODY_READ: Duration = Duration::from_secs(60); -const TIMEOUT_WRITE: Duration = Duration::from_secs(30); // ─── Connection Handler with Buffer Pool async fn handle_connection( mut stream: TcpStream, - router: Arc, - dispatcher: Arc, - server_config: Arc, - session_store: Option>, + router: Rc>, + dispatcher: Rc>, + server_config: Rc>, + session_store: Option>>, ) -> Result<()> { let mut buffer = acquire_buffer(); @@ -618,7 +626,7 @@ async fn handle_connection( &router, &dispatcher, &server_config, - session_store.as_deref(), + session_store.as_deref().map(|arc| arc.as_ref()), ) .await; @@ -689,10 +697,8 @@ async fn handle_connection_inner( b"{\"error\":\"Request Header Fields Too Large\"}", false, ); - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; stream.shutdown().await?; return Ok(()); } @@ -714,10 +720,8 @@ async fn handle_connection_inner( (501u16, &b"{\"error\":\"Not Implemented: chunked transfer encoding is not supported\"}"[..]) }; let response = build_error_response_bytes(status, body, false); - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; stream.shutdown().await?; return Ok(()); } @@ -766,16 +770,12 @@ async fn handle_connection_inner( .await?; } DispatchDecision::SpecializedResponse(response) => { - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; } DispatchDecision::CachedResponse(response) => { - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; } } @@ -805,10 +805,8 @@ async fn handle_connection_inner( None => { let response = build_error_response_bytes(411, b"{\"error\":\"Length Required\"}", false); - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; stream.shutdown().await?; return Ok(()); } @@ -817,10 +815,8 @@ async fn handle_connection_inner( if content_length > MAX_BODY_BYTES { let response = build_error_response_bytes(413, b"{\"error\":\"Payload Too Large\"}", false); - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; stream.shutdown().await?; return Ok(()); } @@ -875,16 +871,12 @@ async fn handle_connection_inner( write_dynamic_dispatch_response(stream, dispatcher, request, keep_alive, cache_insertion, handler_id, &url_bytes, session_store, session_id_body, is_new_session_body).await?; } DispatchDecision::SpecializedResponse(response) => { - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; } DispatchDecision::CachedResponse(response) => { - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; } } @@ -946,13 +938,13 @@ fn parse_request_httparse(bytes: &[u8]) -> Option> { Err(_) => continue, // Skip non-UTF-8 headers }; - // Connection handling + // Connection handling — allocation-free byte comparison if name.eq_ignore_ascii_case("connection") { - let lower = value.to_ascii_lowercase(); - if lower.contains("close") { + let vb = value.as_bytes(); + if contains_ascii_case_insensitive(vb, b"close") { keep_alive = false; } - if lower.contains("keep-alive") { + if contains_ascii_case_insensitive(vb, b"keep-alive") { keep_alive = true; } } @@ -1786,10 +1778,8 @@ async fn write_exact_static_response( static_route.close_response.clone() }; - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; Ok(()) } @@ -1875,10 +1865,11 @@ fn extract_ncache_trailer(dispatch_bytes: &[u8]) -> Option<(u64, usize)> { /// Compute an ncache key from handler_id + full request URL (including query string). /// Different URLs naturally produce different cache keys, so /data?page=1 and /// /data?page=2 are cached separately. +/// Uses FxHasher (~5x faster than SipHash/DefaultHasher for short keys). fn compute_ncache_key(handler_id: u32, url_bytes: &[u8]) -> u64 { use std::hash::{Hash, Hasher}; - use std::collections::hash_map::DefaultHasher; - let mut hasher = DefaultHasher::new(); + use rustc_hash::FxHasher; + let mut hasher = FxHasher::default(); handler_id.hash(&mut hasher); url_bytes.hash(&mut hasher); hasher.finish() @@ -2011,19 +2002,16 @@ async fn write_dynamic_dispatch_response( Ok(mut http_response) => { if let Some((handler_id, cache_key, max_entries, ttl_secs)) = cache_insertion { // Route-level cache insertion (takes precedence over ncache) + // Derive the alternate connection variant by patching the header bytes let response_bytes_close: bytes::Bytes = if !keep_alive { http_response.clone().into() } else { - build_http_response_from_dispatch(response.as_ref(), false) - .unwrap_or_default() - .into() + patch_connection_header(&http_response, false).into() }; let response_ka: bytes::Bytes = if keep_alive { http_response.clone().into() } else { - build_http_response_from_dispatch(response.as_ref(), true) - .unwrap_or_default() - .into() + patch_connection_header(&http_response, true).into() }; crate::router::insert_cached_response(handler_id, cache_key, crate::router::CacheEntry { @@ -2040,16 +2028,12 @@ async fn write_dynamic_dispatch_response( let response_bytes_close: bytes::Bytes = if !keep_alive { http_response.clone().into() } else { - build_http_response_from_dispatch(response.as_ref(), false) - .unwrap_or_default() - .into() + patch_connection_header(&http_response, false).into() }; let response_ka: bytes::Bytes = if keep_alive { http_response.clone().into() } else { - build_http_response_from_dispatch(response.as_ref(), true) - .unwrap_or_default() - .into() + patch_connection_header(&http_response, true).into() }; crate::router::insert_cached_response(handler_id, ncache_key, crate::router::CacheEntry { @@ -2118,10 +2102,8 @@ async fn write_dynamic_dispatch_response( } } - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(http_response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(http_response).await; + write_result?; } Err(_) => { // Security: sanitized error — no internal details @@ -2130,10 +2112,8 @@ async fn write_dynamic_dispatch_response( b"{\"error\":\"Internal Server Error\"}", keep_alive, ); - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; } } } @@ -2144,10 +2124,8 @@ async fn write_dynamic_dispatch_response( b"{\"error\":\"Bad Gateway\"}", keep_alive, ); - let timeout_result = timeout(TIMEOUT_WRITE, stream.write_all(response)).await; - if let Ok((write_result, _)) = timeout_result { - write_result?; - } + let (write_result, _) = stream.write_all(response).await; + write_result?; } } Ok(()) @@ -2227,6 +2205,29 @@ fn build_http_response_from_dispatch(dispatch_bytes: &[u8], keep_alive: bool) -> Ok(output) } +/// Patch the `connection:` header value in an already-built HTTP response. +/// Searches for `connection: keep-alive` or `connection: close` and swaps to the +/// requested variant. The two values differ in length (10 vs 5 bytes) so the +/// Vec may grow or shrink by a few bytes. +fn patch_connection_header(response: &[u8], keep_alive: bool) -> Vec { + let (find, replace) = if keep_alive { + (&b"connection: close\r\n"[..], &b"connection: keep-alive\r\n"[..]) + } else { + (&b"connection: keep-alive\r\n"[..], &b"connection: close\r\n"[..]) + }; + + if let Some(pos) = memmem::find(response, find) { + let mut out = Vec::with_capacity(response.len() + replace.len() - find.len()); + out.extend_from_slice(&response[..pos]); + out.extend_from_slice(replace); + out.extend_from_slice(&response[pos + find.len()..]); + out + } else { + // Header not found (shouldn't happen) — return unchanged clone + response.to_vec() + } +} + /// Resolve the session ID from the cookie header. Returns (session_id, is_new). /// If no cookie is present or invalid, generates a new session ID. fn resolve_session( @@ -2294,33 +2295,87 @@ fn build_error_response_bytes(status: u16, body: &[u8], keep_alive: bool) -> Vec // ─── Security Utilities ───────────────── /// Check for path traversal attempts (../, ..\, etc.) +/// Fast path: if no `..` or null byte is present in the raw bytes, return false +/// immediately without any allocation. Only falls back to percent-decoding when +/// `..` or `%` patterns are detected. fn contains_path_traversal(path: &str) -> bool { - if path.contains('\0') || path.contains("%00") { + let bytes = path.as_bytes(); + + // Fast scan for null bytes + if memchr::memchr(0, bytes).is_some() { + return true; + } + + // Check for literal %00 (null percent-encoding) + if memmem::find(bytes, b"%00").is_some() { return true; } - let mut decoded = path.to_string(); + // Fast path: if no ".." appears anywhere (even encoded), skip the expensive decode + let has_dotdot = memmem::find(bytes, b"..").is_some(); + let has_percent = memchr::memchr(b'%', bytes).is_some(); + + // If no literal ".." and no percent-encoding that could hide "..", we're safe + if !has_dotdot && !has_percent { + return false; + } + + // If we have literal ".." but no percent-encoding, check directly + if has_dotdot && !has_percent { + return check_traversal_patterns(bytes); + } + + // Percent-encoding present — decode once and check + let decoded = percent_decode_path_bytes(bytes); + check_traversal_patterns(&decoded) +} + +/// Check traversal patterns on raw bytes (after any decoding). +#[inline] +fn check_traversal_patterns(b: &[u8]) -> bool { + memmem::find(b, b"/../").is_some() + || memmem::find(b, b"\\..\\").is_some() + || b.ends_with(b"/..") + || b.ends_with(b"\\..") + || b.starts_with(b"../") + || b.starts_with(b"..\\") + || b == b".." +} + +/// Percent-decode path-relevant characters (%2e, %2f, %5c) in-place into a new +/// Vec. Iteratively decodes up to 3 rounds (to handle double-encoding). +fn percent_decode_path_bytes(input: &[u8]) -> Vec { + let mut current = input.to_vec(); for _ in 0..3 { - let next = decoded - .replace("%2e", ".") - .replace("%2E", ".") - .replace("%2f", "/") - .replace("%2F", "/") - .replace("%5c", "\\") - .replace("%5C", "\\"); - if next == decoded { - break; + let mut changed = false; + let mut decoded = Vec::with_capacity(current.len()); + let mut i = 0; + while i < current.len() { + if current[i] == b'%' && i + 2 < current.len() { + let hi = current[i + 1]; + let lo = current[i + 2]; + let replacement = match (hi, lo) { + (b'2', b'e') | (b'2', b'E') => Some(b'.'), + (b'2', b'f') | (b'2', b'F') => Some(b'/'), + (b'5', b'c') | (b'5', b'C') => Some(b'\\'), + _ => None, + }; + if let Some(ch) = replacement { + decoded.push(ch); + i += 3; + changed = true; + continue; + } + } + decoded.push(current[i]); + i += 1; } - decoded = next; + if !changed { + return current; + } + current = decoded; } - - decoded.contains("/../") - || decoded.contains("\\..\\") - || decoded.ends_with("/..") - || decoded.ends_with("\\..") - || decoded.starts_with("../") - || decoded.starts_with("..\\") - || decoded == ".." + current } /// RFC 8259 compliant JSON string escaping — handles ALL control characters @@ -2438,11 +2493,25 @@ fn trim_ascii_spaces(bytes: &[u8]) -> &[u8] { } fn normalize_runtime_path(path: &str) -> Cow<'_, str> { + // Fast path: "/" or no trailing slash — zero allocation if path == "/" || !path.ends_with('/') { return Cow::Borrowed(path); } - Cow::Owned(crate::analyzer::normalize_path(path)) + // Strip trailing slashes; ensure leading slash. Avoids the + // analyzer::normalize_path call which does `.to_string()` + `.trim_end_matches`. + let trimmed = path.trim_end_matches('/'); + if trimmed.is_empty() { + return Cow::Borrowed("/"); + } + if trimmed.starts_with('/') { + Cow::Owned(trimmed.to_string()) + } else { + let mut s = String::with_capacity(trimmed.len() + 1); + s.push('/'); + s.push_str(trimmed); + Cow::Owned(s) + } } fn config_string( diff --git a/src/bridge.js b/src/bridge.js index 0d63356..d45ba4b 100644 --- a/src/bridge.js +++ b/src/bridge.js @@ -592,6 +592,15 @@ function getCachedHeaderNameBytes(name) { return bytes; } +// Pre-encoded content-type header name + common values to skip textEncoder.encode() on hot path +const CT_NAME_BYTES = textEncoder.encode("content-type"); +const PREENCODED_CT_VALUES = { + "application/json; charset=utf-8": textEncoder.encode("application/json; charset=utf-8"), + "text/plain; charset=utf-8": textEncoder.encode("text/plain; charset=utf-8"), + "text/html; charset=utf-8": textEncoder.encode("text/html; charset=utf-8"), + "application/octet-stream": textEncoder.encode("application/octet-stream"), +}; + /** * Encode a JS response snapshot into the binary envelope that the Rust * layer can decode directly into HTTP/1.1 response bytes. @@ -608,7 +617,6 @@ export function encodeResponseEnvelope(snapshot) { ? Buffer.from(snapshot.body) : Buffer.alloc(0); - // Encode headers inline — avoids Object.entries().map() intermediate arrays const ncache = snapshot.ncache || null; const headerKeys = rawHeaders ? Object.keys(rawHeaders) : EMPTY_ARRAY; const headerCount = headerKeys.length; @@ -618,8 +626,19 @@ export function encodeResponseEnvelope(snapshot) { for (let i = 0; i < headerCount; i++) { const name = headerKeys[i]; - const nameBytes = getCachedHeaderNameBytes(name); - const valueBytes = textEncoder.encode(String(rawHeaders[name])); + const rawValue = String(rawHeaders[name]); + + // Fast path: use pre-encoded bytes for content-type + let nameBytes, valueBytes; + if (name === "content-type") { + nameBytes = CT_NAME_BYTES; + const preValue = PREENCODED_CT_VALUES[rawValue]; + valueBytes = preValue || textEncoder.encode(rawValue); + } else { + nameBytes = getCachedHeaderNameBytes(name); + valueBytes = textEncoder.encode(rawValue); + } + if (nameBytes.length > 0xff) { throw new Error(`Response header name too long: ${nameBytes.length}`); } diff --git a/src/index.js b/src/index.js index 32e0895..63e29db 100644 --- a/src/index.js +++ b/src/index.js @@ -551,9 +551,12 @@ function createDispatcher( const dispatchStartMs = trackDispatchTiming ? performance.now() : 0; try { - const middlewareResult = route.runMiddlewares(req, res); - if (!res.finished && isPromiseLike(middlewareResult)) { - await middlewareResult; + // Fast path: skip middleware runner entirely when no middlewares are attached + if (route._hasMiddlewares) { + const middlewareResult = route.runMiddlewares(req, res); + if (!res.finished && isPromiseLike(middlewareResult)) { + await middlewareResult; + } } if (!res.finished) { const handlerResult = route.compiledHandler(req, res); @@ -823,6 +826,7 @@ function compileRouteDispatch( requestFactory, runMiddlewares, compiledHandler, + _hasMiddlewares: applicableMiddlewares.length > 0, dispatchKind: requestPlan.dispatchKind, jsonFastPath, jsonSerializer: createJsonSerializer(jsonFastPath),