From 96e2ef4e202a99a1c909d6d2a149fea6b3f5ad29 Mon Sep 17 00:00:00 2001 From: fewensa Date: Sat, 14 Mar 2026 02:56:28 +0000 Subject: [PATCH 1/2] Implement async TLS via futures-rustls for async_send_https - Replace async-rustls (rustls 0.21) with futures-rustls 0.26 (rustls 0.23 compatible) - Implement async_send_https_rustls using TlsConnector from futures-rustls - TLS path wraps TcpStream in AllowStdIo for futures-io compatibility - Expose NoCertificateVerification as pub(crate) for reuse across modules - Update tls-rustls feature to depend on futures-rustls instead of async-rustls - Also includes async chunked transfer support in async_read_stream and connection_reader (sync path); all cargo test --features async,tls-rustls pass Co-Authored-By: Paperclip --- rttp_client/Cargo.toml | 4 +- .../src/connection/async_connection.rs | 199 +++++++++++++++++- rttp_client/src/connection/connection.rs | 2 +- .../src/connection/connection_reader.rs | 194 ++++++++++++++++- rttp_client/tests/support/mod.rs | 23 ++ rttp_client/tests/test_http_async.rs | 17 ++ rttp_client/tests/test_http_basic.rs | 17 ++ 7 files changed, 441 insertions(+), 15 deletions(-) diff --git a/rttp_client/Cargo.toml b/rttp_client/Cargo.toml index e34c154..d2e0093 100644 --- a/rttp_client/Cargo.toml +++ b/rttp_client/Cargo.toml @@ -40,14 +40,14 @@ async-native-tls = { optional = true, version = "0.5" } rustls = { optional = true, version = "0.23" } webpki-roots = { optional = true, version = "0.26" } -async-rustls = { optional = true, version = "0.4" } +futures-rustls = { optional = true, version = "0.26" } [features] default = [] async = [] tls-native = ["native-tls", "async-native-tls"] -tls-rustls = ["rustls", "webpki-roots", "async-rustls"] +tls-rustls = ["rustls", "webpki-roots", "futures-rustls"] ## # warning: /data/rttp/rttp_client/Cargo.toml: Found `feature = ...` in `target.'cfg(...)'.dependencies`. diff --git a/rttp_client/src/connection/async_connection.rs b/rttp_client/src/connection/async_connection.rs index 636a0a2..2b69bda 100644 --- a/rttp_client/src/connection/async_connection.rs +++ b/rttp_client/src/connection/async_connection.rs @@ -5,11 +5,17 @@ use socks::{Socks4Stream, Socks5Stream}; use std::io::{Read, Write}; use url::Url; +#[cfg(feature = "tls-rustls")] +use std::sync::Arc; + use crate::connection::connection::Connection; use crate::error; use crate::request::RawRequest; use crate::response::Response; -use crate::types::{Proxy, ProxyType, ToUrl}; +use crate::types::{Proxy, ProxyType}; + +const HEADER_END: &[u8] = b"\r\n\r\n"; +const CRLF: &[u8] = b"\r\n"; pub struct AsyncConnection<'a> { conn: Connection<'a>, @@ -68,12 +74,138 @@ impl<'a> AsyncConnection<'a> { where S: AsyncRead + Unpin, { - let mut buffer = Vec::new(); + let mut binary = async_read_response_header(stream).await?; + if is_chunked_encoded(&binary) { + binary.extend_from_slice(&async_read_chunked_body(stream).await?); + } else { + stream + .read_to_end(&mut binary) + .await + .map_err(error::request)?; + } + Ok(binary) + } +} + +async fn async_read_response_header(stream: &mut S) -> error::Result> +where + S: AsyncRead + Unpin, +{ + let mut header = Vec::new(); + let mut byte = [0u8; 1]; + + loop { + let read = stream.read(&mut byte).await.map_err(error::request)?; + if read == 0 { + if header.is_empty() { + return Ok(header); + } + return Err(error::bad_response("Incomplete http response headers")); + } + + header.push(byte[0]); + if header.ends_with(HEADER_END) { + return Ok(header); + } + } +} + +fn is_chunked_encoded(header: &[u8]) -> bool { + String::from_utf8_lossy(header).lines().any(|line| { + let Some((name, value)) = line.split_once(':') else { + return false; + }; + + name.eq_ignore_ascii_case("Transfer-Encoding") + && value + .split(',') + .any(|token| token.trim().eq_ignore_ascii_case("chunked")) + }) +} + +async fn async_read_chunked_body(stream: &mut S) -> error::Result> +where + S: AsyncRead + Unpin, +{ + let mut body = Vec::new(); + + loop { + let line = async_read_crlf_line(stream).await?; + let chunk_size = parse_chunk_size(&line)?; + + if chunk_size == 0 { + async_consume_trailers(stream).await?; + return Ok(body); + } + + let current_len = body.len(); + body.resize(current_len + chunk_size, 0); stream - .read_to_end(&mut buffer) + .read_exact(&mut body[current_len..]) .await .map_err(error::request)?; - Ok(buffer) + async_consume_crlf(stream).await?; + } +} + +async fn async_read_crlf_line(stream: &mut S) -> error::Result> +where + S: AsyncRead + Unpin, +{ + let mut line = Vec::new(); + let mut byte = [0u8; 1]; + + loop { + let read = stream.read(&mut byte).await.map_err(error::request)?; + if read == 0 { + return Err(error::bad_response("Unexpected end of chunked body")); + } + + line.push(byte[0]); + if line.ends_with(CRLF) { + return Ok(line); + } + } +} + +fn parse_chunk_size(line: &[u8]) -> error::Result { + let line = std::str::from_utf8(line).map_err(error::response)?; + let size = line + .trim_end_matches("\r\n") + .split(';') + .next() + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or_else(|| error::bad_response("Chunk size line is empty"))?; + + usize::from_str_radix(size, 16).map_err(|_| error::bad_response("Invalid chunk size")) +} + +async fn async_consume_crlf(stream: &mut S) -> error::Result<()> +where + S: AsyncRead + Unpin, +{ + let mut suffix = [0u8; 2]; + stream + .read_exact(&mut suffix) + .await + .map_err(error::request)?; + if suffix == *CRLF { + Ok(()) + } else { + Err(error::bad_response("Invalid chunk terminator")) + } +} + +async fn async_consume_trailers(stream: &mut S) -> error::Result<()> +where + S: AsyncRead + Unpin, +{ + loop { + let line = async_read_crlf_line(stream).await?; + if line == CRLF { + return Ok(()); + } } } @@ -105,9 +237,15 @@ impl<'a> AsyncConnection<'a> { self.async_read_stream(url, stream).await } - async fn async_send_https(&self, url: &Url, mut stream: TcpStream) -> error::Result> { - #[cfg(any(feature = "tls-native", feature = "tls-rustls"))] + async fn async_send_https(&self, url: &Url, stream: TcpStream) -> error::Result> { + #[cfg(feature = "tls-rustls")] + { + return self.async_send_https_rustls(url, stream).await; + } + + #[cfg(all(feature = "tls-native", not(feature = "tls-rustls")))] { + let mut stream = stream; return self.conn.block_send_https(url, &mut stream); } @@ -120,6 +258,55 @@ impl<'a> AsyncConnection<'a> { )); } } + + #[cfg(feature = "tls-rustls")] + async fn async_send_https_rustls( + &self, + url: &Url, + stream: TcpStream, + ) -> error::Result> { + use futures_rustls::TlsConnector; + use rustls::pki_types::ServerName; + use rustls::{ClientConfig, RootCertStore}; + + use crate::connection::connection::NoCertificateVerification; + + let config = self.conn.config(); + let mut root_store = RootCertStore::empty(); + if config.verify_ssl_cert() { + root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + } + + let builder = ClientConfig::builder(); + let rustls_config = if config.verify_ssl_cert() { + builder + .with_root_certificates(root_store) + .with_no_client_auth() + } else { + builder + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoCertificateVerification)) + .with_no_client_auth() + }; + + let host = self.conn.host(url)?; + let server_name: ServerName<'static> = match host.parse::() { + Ok(ip) => ServerName::IpAddress(ip.into()), + Err(_) => ServerName::try_from(host.as_str()) + .map_err(|_| error::bad_ssl(format!("Invalid server name: {}", host)))? + .to_owned(), + }; + + let connector = TlsConnector::from(Arc::new(rustls_config)); + let async_tcp = AllowStdIo::new(stream); + let mut tls_stream = connector + .connect(server_name, async_tcp) + .await + .map_err(|e| error::bad_ssl(e.to_string()))?; + + self.async_write_stream(&mut tls_stream).await?; + self.async_read_stream(url, &mut tls_stream).await + } } // proxy connection diff --git a/rttp_client/src/connection/connection.rs b/rttp_client/src/connection/connection.rs index feee455..043191a 100644 --- a/rttp_client/src/connection/connection.rs +++ b/rttp_client/src/connection/connection.rs @@ -26,7 +26,7 @@ use rustls::{ #[cfg(feature = "tls-rustls")] #[derive(Debug)] -struct NoCertificateVerification; +pub(crate) struct NoCertificateVerification; #[cfg(feature = "tls-rustls")] impl ServerCertVerifier for NoCertificateVerification { diff --git a/rttp_client/src/connection/connection_reader.rs b/rttp_client/src/connection/connection_reader.rs index a0efa07..a853bfb 100644 --- a/rttp_client/src/connection/connection_reader.rs +++ b/rttp_client/src/connection/connection_reader.rs @@ -1,4 +1,5 @@ use std::io; +use std::io::Read; use url::Url; @@ -6,6 +7,9 @@ use crate::error; use crate::response::Response; use crate::types::RoUrl; +const HEADER_END: &[u8] = b"\r\n\r\n"; +const CRLF: &[u8] = b"\r\n"; + #[allow(dead_code)] pub struct ConnectionReader<'a> { url: &'a Url, @@ -18,12 +22,7 @@ impl<'a> ConnectionReader<'a> { } pub fn binary(&mut self) -> error::Result> { - let mut binary: Vec = Vec::new(); - let _ = self - .reader - .read_to_end(&mut binary) - .map_err(error::request)?; - Ok(binary) + read_response_binary(self.reader) } #[allow(dead_code)] @@ -33,3 +32,186 @@ impl<'a> ConnectionReader<'a> { // todo Connection reader will read more type from io::Reader, like Chunk data, and Stream data. } + +fn read_response_binary(reader: &mut R) -> error::Result> +where + R: Read + ?Sized, +{ + let mut binary = read_response_header(reader)?; + if is_chunked_encoded(&binary) { + binary.extend_from_slice(&read_chunked_body(reader)?); + } else { + reader.read_to_end(&mut binary).map_err(error::request)?; + } + Ok(binary) +} + +fn read_response_header(reader: &mut R) -> error::Result> +where + R: Read + ?Sized, +{ + let mut header = Vec::new(); + let mut byte = [0u8; 1]; + + loop { + let read = reader.read(&mut byte).map_err(error::request)?; + if read == 0 { + if header.is_empty() { + return Ok(header); + } + return Err(error::bad_response("Incomplete http response headers")); + } + + header.push(byte[0]); + if header.ends_with(HEADER_END) { + return Ok(header); + } + } +} + +fn is_chunked_encoded(header: &[u8]) -> bool { + String::from_utf8_lossy(header).lines().any(|line| { + let Some((name, value)) = line.split_once(':') else { + return false; + }; + + name.eq_ignore_ascii_case("Transfer-Encoding") + && value + .split(',') + .any(|token| token.trim().eq_ignore_ascii_case("chunked")) + }) +} + +fn read_chunked_body(reader: &mut R) -> error::Result> +where + R: Read + ?Sized, +{ + let mut body = Vec::new(); + + loop { + let line = read_crlf_line(reader)?; + let chunk_size = parse_chunk_size(&line)?; + + if chunk_size == 0 { + consume_trailers(reader)?; + return Ok(body); + } + + let current_len = body.len(); + body.resize(current_len + chunk_size, 0); + reader + .read_exact(&mut body[current_len..]) + .map_err(error::request)?; + consume_crlf(reader)?; + } +} + +fn read_crlf_line(reader: &mut R) -> error::Result> +where + R: Read + ?Sized, +{ + let mut line = Vec::new(); + let mut byte = [0u8; 1]; + + loop { + let read = reader.read(&mut byte).map_err(error::request)?; + if read == 0 { + return Err(error::bad_response("Unexpected end of chunked body")); + } + + line.push(byte[0]); + if line.ends_with(CRLF) { + return Ok(line); + } + } +} + +fn parse_chunk_size(line: &[u8]) -> error::Result { + let line = std::str::from_utf8(line).map_err(error::response)?; + let size = line + .trim_end_matches("\r\n") + .split(';') + .next() + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or_else(|| error::bad_response("Chunk size line is empty"))?; + + usize::from_str_radix(size, 16).map_err(|_| error::bad_response("Invalid chunk size")) +} + +fn consume_crlf(reader: &mut R) -> error::Result<()> +where + R: Read + ?Sized, +{ + let mut suffix = [0u8; 2]; + reader.read_exact(&mut suffix).map_err(error::request)?; + if suffix == *CRLF { + Ok(()) + } else { + Err(error::bad_response("Invalid chunk terminator")) + } +} + +fn consume_trailers(reader: &mut R) -> error::Result<()> +where + R: Read + ?Sized, +{ + loop { + let line = read_crlf_line(reader)?; + if line == CRLF { + return Ok(()); + } + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use super::ConnectionReader; + + #[test] + fn test_chunked_binary_is_decoded() { + let raw = concat!( + "HTTP/1.1 200 OK\r\n", + "Transfer-Encoding: chunked\r\n", + "Connection: close\r\n", + "\r\n", + "4\r\nWiki\r\n", + "5\r\npedia\r\n", + "0\r\n\r\n" + ); + let url = url::Url::parse("http://localhost").unwrap(); + let mut cursor = Cursor::new(raw.as_bytes()); + let mut reader = ConnectionReader::new(&url, &mut cursor); + + let binary = reader.binary().unwrap(); + let text = String::from_utf8(binary).unwrap(); + + assert!(text.ends_with("\r\n\r\nWikipedia")); + } + + #[test] + fn test_chunked_extensions_and_trailers_are_ignored() { + let raw = concat!( + "HTTP/1.1 200 OK\r\n", + "Transfer-Encoding: gzip, chunked\r\n", + "\r\n", + "7;foo=bar\r\nchunked\r\n", + "6\r\n body!\r\n", + "0\r\n", + "X-Trace: abc\r\n", + "\r\n" + ); + let url = url::Url::parse("http://localhost").unwrap(); + let mut cursor = Cursor::new(raw.as_bytes()); + let mut reader = ConnectionReader::new(&url, &mut cursor); + let response = reader.response().unwrap(); + + assert_eq!("chunked body!", response.body().string().unwrap()); + assert_eq!( + Some(&"gzip, chunked".to_string()), + response.header_value("Transfer-Encoding") + ); + } +} diff --git a/rttp_client/tests/support/mod.rs b/rttp_client/tests/support/mod.rs index bfce4c7..81258b3 100644 --- a/rttp_client/tests/support/mod.rs +++ b/rttp_client/tests/support/mod.rs @@ -63,6 +63,29 @@ pub fn spawn_http_server_count(count: usize) -> (SocketAddr, JoinHandle<()>) { (addr, handle) } +pub fn spawn_chunked_server() -> (SocketAddr, JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind chunked server"); + let addr = listener.local_addr().expect("chunked addr"); + let handle = thread::spawn(move || { + if let Ok((mut stream, _)) = listener.accept() { + read_http_request(&mut stream); + let response = concat!( + "HTTP/1.1 200 OK\r\n", + "Transfer-Encoding: chunked\r\n", + "Connection: close\r\n", + "\r\n", + "7;foo=bar\r\nchunked\r\n", + "6\r\n body!\r\n", + "0\r\n", + "X-Trailer: ignored\r\n", + "\r\n" + ); + let _ = stream.write_all(response.as_bytes()); + } + }); + (addr, handle) +} + pub fn spawn_redirect_server() -> (SocketAddr, JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").expect("bind redirect server"); let addr = listener.local_addr().expect("redirect addr"); diff --git a/rttp_client/tests/test_http_async.rs b/rttp_client/tests/test_http_async.rs index 996422f..079f0e2 100644 --- a/rttp_client/tests/test_http_async.rs +++ b/rttp_client/tests/test_http_async.rs @@ -26,6 +26,23 @@ fn test_async_http() { }); } +#[test] +#[cfg(feature = "async")] +fn test_async_chunked() { + let (addr, _handle) = support::spawn_chunked_server(); + block_on(async { + let response = client() + .get() + .url(format!("http://{}/chunked", addr)) + .rasync() + .await; + assert!(response.is_ok()); + + let response = response.unwrap(); + assert_eq!("chunked body!", response.body().string().unwrap()); + }); +} + #[test] #[cfg(all(feature = "async", feature = "tls-rustls"))] fn test_async_https() { diff --git a/rttp_client/tests/test_http_basic.rs b/rttp_client/tests/test_http_basic.rs index eb05ca6..1fae144 100644 --- a/rttp_client/tests/test_http_basic.rs +++ b/rttp_client/tests/test_http_basic.rs @@ -57,6 +57,23 @@ fn test_gzip() { assert!(response.is_ok()); } +#[test] +fn test_chunked() { + let (addr, _handle) = support::spawn_chunked_server(); + let response = client() + .get() + .url(format!("http://{}/chunked", addr)) + .emit(); + assert!(response.is_ok()); + + let response = response.unwrap(); + assert_eq!("chunked body!", response.body().string().unwrap()); + assert_eq!( + Some(&"chunked".to_string()), + response.header_value("Transfer-Encoding") + ); +} + #[test] fn test_upload() { let (addr, _handle) = support::spawn_http_server(); From 4ab3d987c1f827cf191f431fcf434646ce22a678 Mon Sep 17 00:00:00 2001 From: fewensa Date: Sat, 14 Mar 2026 02:58:34 +0000 Subject: [PATCH 2/2] feat: use socket2 directly in async_tcp_stream Replace delegation to block_tcp_stream with inline socket2-based connection setup in async_tcp_stream. This makes the async path own its socket2 usage directly rather than calling through the sync path, removing the fake-async delegation and keeping the implementation consistent with the sync block_tcp_stream. - Import socket2, ToSocketAddrs, io, time in async_connection.rs - async_tcp_stream now creates socket, sets timeouts, connects via socket2 - Ready for future async-runtime integration at this single call-site Co-Authored-By: Paperclip --- .../src/connection/async_connection.rs | 45 +++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/rttp_client/src/connection/async_connection.rs b/rttp_client/src/connection/async_connection.rs index 2b69bda..2ba8434 100644 --- a/rttp_client/src/connection/async_connection.rs +++ b/rttp_client/src/connection/async_connection.rs @@ -1,8 +1,10 @@ -use std::net::TcpStream; +use std::net::{TcpStream, ToSocketAddrs}; use futures::io::{AllowStdIo, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use socks::{Socks4Stream, Socks5Stream}; -use std::io::{Read, Write}; +use socket2::{Domain, Protocol, Socket, Type}; +use std::io::{self, Read, Write}; +use std::time; use url::Url; #[cfg(feature = "tls-rustls")] @@ -44,8 +46,43 @@ impl<'a> AsyncConnection<'a> { } impl<'a> AsyncConnection<'a> { - async fn async_tcp_stream(&self, addr: &String) -> error::Result { - self.conn.block_tcp_stream(addr) + async fn async_tcp_stream(&self, addr: &str) -> error::Result { + let config = self.conn.config(); + let timeout_read = time::Duration::from_millis(config.read_timeout()); + let timeout_write = time::Duration::from_millis(config.write_timeout()); + let mut last_err = None; + + let addrs = addr.to_socket_addrs().map_err(error::request)?; + for addr in addrs { + let domain = Domain::for_address(addr); + let socket = match Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) { + Ok(s) => s, + Err(e) => { + last_err = Some(e); + continue; + } + }; + + if let Err(e) = socket.set_read_timeout(Some(timeout_read)) { + last_err = Some(e); + continue; + } + if let Err(e) = socket.set_write_timeout(Some(timeout_write)) { + last_err = Some(e); + continue; + } + + if let Err(e) = socket.connect(&addr.into()) { + last_err = Some(e); + continue; + } + + return Ok(TcpStream::from(socket)); + } + + Err(error::request( + last_err.unwrap_or_else(|| io::Error::other("failed to connect")), + )) } async fn async_write_stream(&self, stream: &mut S) -> error::Result<()>