Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build/deps/gen/deps.MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ bazel_dep(name = "tcmalloc", version = "0.0.0-20250927-12f2552")
# workerd-cxx
http.archive(
name = "workerd-cxx",
sha256 = "979f8ffc98f8a8577264167f4c7d07ec08ec790dd085110dadbebc476a1d1d7f",
strip_prefix = "cloudflare-workerd-cxx-5b4e067",
sha256 = "fbba1b102b2c4fe879b2f610d7e94ceda6beceac3d57a27196482ce3e9536b50",
strip_prefix = "cloudflare-workerd-cxx-c677ef5",
type = "tgz",
url = "https://github.com/cloudflare/workerd-cxx/tarball/5b4e067b5180b6a791443ef16b0068b57bbc88bd",
url = "https://github.com/cloudflare/workerd-cxx/tarball/c677ef53092a8425ce9f059074441fdb1b7c1ed3",
)
use_repo(http, "workerd-cxx")

Expand Down
1 change: 1 addition & 0 deletions compile_flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
-isystembazel-bin/src/rust/worker/_virtual_includes/error.rs@cxx
-isystembazel-bin/src/rust/worker/_virtual_includes/ffi.rs@cxx
-isystembazel-bin/src/rust/worker/_virtual_includes/kill_switch.rs@cxx
-isystembazel-bin/src/rust/worker/_virtual_includes/ok.rs@cxx
-D_FORTIFY_SOURCE=1
-D_LIBCPP_REMOVE_TRANSITIVE_INCLUDES
-D_LIBCPP_NO_ABI_TAG
Expand Down
3 changes: 3 additions & 0 deletions src/rust/kj/ffi.c++
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ static_assert(alignof(kj::rust::HttpConnectSettings) == alignof(uint64_t),
"HttpConnectSettings alignment mismatch");

namespace kj::rust {

// This stays out-of-line because HttpConnectSettings is defined in the generated cxx bridge
// header, and ffi.h cannot include that header without creating an include cycle.
kj::Promise<void> connect(HttpService& service,
::rust::Slice<const kj::byte> host,
const HttpHeaders& headers,
Expand Down
38 changes: 38 additions & 0 deletions src/rust/kj/ffi.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,27 @@ using AsyncInputStream = kj::AsyncInputStream;
using AsyncOutputStream = kj::AsyncOutputStream;
using AsyncIoStream = kj::AsyncIoStream;

inline kj::Promise<void> async_output_stream_write(
AsyncOutputStream& stream, ::rust::Slice<const kj::byte> buffer) {
return stream.write(kj::from<kj_rs::Rust>(buffer));
}

inline kj::Promise<void> async_output_stream_when_write_disconnected(AsyncOutputStream& stream) {
return stream.whenWriteDisconnected();
}

// --- kj::HttpHeaders ffi

using BuiltinIndicesEnum = kj::HttpHeaders::BuiltinIndicesEnum;
using HttpHeaderTable = kj::HttpHeaderTable;
using HttpHeaders = kj::HttpHeaders;
using HttpHeaderId = kj::HttpHeaderId;

inline kj::Own<kj::HttpHeaders> new_http_headers(const HttpHeaderTable& table) {
// There is no C++ stack frame to hold the new instance, so we heap allocate it for Rust.
return kj::heap<kj::HttpHeaders>(table);
}

inline kj::Own<kj::HttpHeaders> clone_shallow(const HttpHeaders& headers) {
// there is no c++ stack frame to hold the new instance,
// so sadly we have to heap allocate it.
Expand Down Expand Up @@ -96,6 +111,29 @@ using HttpService = kj::HttpService;
using HttpServiceResponse = kj::HttpService::Response;
using TlsStarterCallback = kj::TlsStarterCallback;

inline kj::Own<AsyncOutputStream> response_send(HttpServiceResponse& response,
uint32_t statusCode,
::rust::Str statusText,
const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize) {
return response.send(statusCode, kj::str(statusText), headers, expectedBodySize);
}

inline void connect_response_accept(ConnectResponse& response,
uint32_t statusCode,
::rust::Str statusText,
const HttpHeaders& headers) {
response.accept(statusCode, kj::str(statusText), headers);
}

inline kj::Own<AsyncOutputStream> connect_response_reject(ConnectResponse& response,
uint32_t statusCode,
::rust::Str statusText,
const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize) {
return response.reject(statusCode, kj::str(statusText), headers, expectedBodySize);
}

inline kj::Promise<void> request(HttpService& service,
HttpMethod method,
::rust::Slice<const kj::byte> url,
Expand Down
184 changes: 158 additions & 26 deletions src/rust/kj/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use kj_rs::KjOwn;
use static_assertions::assert_eq_align;
use static_assertions::assert_eq_size;

use crate::OwnOrRef;
use crate::OwnOrMut;
use crate::Result;
use crate::io::AsyncInputStream;
use crate::io::AsyncIoStream;
Expand Down Expand Up @@ -95,7 +95,9 @@ pub mod ffi {

unsafe extern "C++" {
type BuiltinIndicesEnum;
type HttpHeaderTable;
type HttpHeaders;
fn new_http_headers(table: &HttpHeaderTable) -> KjOwn<HttpHeaders>;
fn clone_shallow(this_: &HttpHeaders) -> KjOwn<HttpHeaders>;
fn set_header(this_: Pin<&mut HttpHeaders>, id: BuiltinIndicesEnum, value: &str);
unsafe fn get_header<'a>(
Expand All @@ -121,12 +123,36 @@ pub mod ffi {
}

unsafe extern "C++" {
type AsyncInputStream = crate::io::AsyncInputStream;
type AsyncIoStream = crate::io::AsyncIoStream;
type AsyncInputStream = crate::io::ffi::AsyncInputStream;
type AsyncIoStream = crate::io::ffi::AsyncIoStream;
type AsyncOutputStream = crate::io::ffi::AsyncOutputStream;
type ConnectResponse;
type HttpServiceResponse;
type HttpService;

fn response_send(
this_: Pin<&mut HttpServiceResponse>,
status_code: u32,
status_text: &str,
headers: &HttpHeaders,
expected_body_size: KjMaybe<u64>,
) -> Result<KjOwn<AsyncOutputStream>>;

fn connect_response_accept(
this_: Pin<&mut ConnectResponse>,
status_code: u32,
status_text: &str,
headers: &HttpHeaders,
) -> Result<()>;

fn connect_response_reject(
this_: Pin<&mut ConnectResponse>,
status_code: u32,
status_text: &str,
headers: &HttpHeaders,
expected_body_size: KjMaybe<u64>,
) -> Result<KjOwn<AsyncOutputStream>>;

/// Corresponds to `kj::HttpService::request`.
async fn request(
this_: Pin<&mut HttpService>,
Expand Down Expand Up @@ -178,6 +204,7 @@ assert_eq_size!(ffi::HttpConnectSettings, [u8; 16]);
assert_eq_align!(ffi::HttpConnectSettings, u64);

pub type HeaderId = ffi::BuiltinIndicesEnum;
pub type HttpHeaderTable = ffi::HttpHeaderTable;
pub type CustomHttpHeader = ffi::HttpHeaderId;
// TODO(tewaro) soon: replace by enum HeaderId

Expand Down Expand Up @@ -219,6 +246,7 @@ impl<'a> CustomHttpHeaderId<'a> {
}

/// Non-owning constant reference to `kj::HttpHeaders`
#[derive(Clone, Copy)]
pub struct HttpHeadersRef<'a>(&'a ffi::HttpHeaders);

impl HttpHeadersRef<'_> {
Expand Down Expand Up @@ -259,6 +287,14 @@ pub struct HttpHeaders<'a> {
}

impl<'a> HttpHeaders<'a> {
#[must_use]
pub fn new(table: &'a HttpHeaderTable) -> Self {
Self {
own: ffi::new_http_headers(table),
_marker: PhantomData,
}
}

pub fn set(&mut self, id: HeaderId, value: &str) {
ffi::set_header(self.own.as_mut(), id, value);
}
Expand All @@ -268,11 +304,96 @@ impl<'a> HttpHeaders<'a> {
}
}

impl<'a, 'b> From<&'b HttpHeaders<'a>> for HttpHeadersRef<'b> {
fn from(value: &'b HttpHeaders<'a>) -> Self {
value.as_ref()
}
}

pub type HttpMethod = ffi::HttpMethod;
pub type HttpServiceResponse = ffi::HttpServiceResponse;
pub type ConnectResponse = ffi::ConnectResponse;
pub type HttpConnectSettings<'a> = ffi::HttpConnectSettings<'a>;

/// Non-owning mutable reference to `kj::HttpService::Response`.
pub struct HttpServiceResponse<'a>(Pin<&'a mut ffi::HttpServiceResponse>);

impl<'a> HttpServiceResponse<'a> {
/// Send response metadata and obtain the writable response body stream.
pub fn send<'h>(
self,
status_code: u32,
status_text: &str,
headers: impl Into<HttpHeadersRef<'h>>,
expected_body_size: Option<u64>,
) -> Result<crate::io::AsyncOutputStream<'a>> {
Ok(ffi::response_send(
self.0,
status_code,
status_text,
headers.into().0,
expected_body_size.into(),
)?
.into())
}

pub(crate) fn into_ffi(self) -> Pin<&'a mut ffi::HttpServiceResponse> {
self.0
}
}

impl<'a> From<Pin<&'a mut ffi::HttpServiceResponse>> for HttpServiceResponse<'a> {
fn from(value: Pin<&'a mut ffi::HttpServiceResponse>) -> Self {
Self(value)
}
}

/// Non-owning mutable reference to `kj::HttpService::ConnectResponse`.
pub struct ConnectResponse<'a>(Pin<&'a mut ffi::ConnectResponse>);

impl<'a> ConnectResponse<'a> {
/// Accept the CONNECT request without a response body.
pub fn accept<'h>(
self,
status_code: u32,
status_text: &str,
headers: impl Into<HttpHeadersRef<'h>>,
) -> Result<()> {
Ok(ffi::connect_response_accept(
self.0,
status_code,
status_text,
headers.into().0,
)?)
}

/// Reject the CONNECT request and obtain the writable rejection body stream.
pub fn reject<'h>(
self,
status_code: u32,
status_text: &str,
headers: impl Into<HttpHeadersRef<'h>>,
expected_body_size: Option<u64>,
) -> Result<crate::io::AsyncOutputStream<'a>> {
Ok(ffi::connect_response_reject(
self.0,
status_code,
status_text,
headers.into().0,
expected_body_size.into(),
)?
.into())
}

pub(crate) fn into_ffi(self) -> Pin<&'a mut ffi::ConnectResponse> {
self.0
}
}

impl<'a> From<Pin<&'a mut ffi::ConnectResponse>> for ConnectResponse<'a> {
fn from(value: Pin<&'a mut ffi::ConnectResponse>) -> Self {
Self(value)
}
}

#[async_trait::async_trait(?Send)]
pub trait HttpService {
/// Make an HTTP request.
Expand All @@ -282,7 +403,7 @@ pub trait HttpService {
url: &'a [u8],
headers: HttpHeadersRef<'a>,
request_body: Pin<&'a mut AsyncInputStream>,
response: Pin<&'a mut HttpServiceResponse>,
response: HttpServiceResponse<'a>,
) -> Result<()>;

/// Make a CONNECT request
Expand All @@ -297,7 +418,7 @@ pub trait HttpService {
host: &'a [u8],
headers: HttpHeadersRef<'a>,
connection: Pin<&'a mut AsyncIoStream>,
response: Pin<&'a mut ConnectResponse>,
response: ConnectResponse<'a>,
settings: HttpConnectSettings<'a>,
) -> Result<()>;

Expand All @@ -311,7 +432,7 @@ pub trait HttpService {
}
}

pub struct CxxHttpService<'a>(OwnOrRef<'a, ffi::HttpService>);
pub struct CxxHttpService<'a>(OwnOrMut<'a, ffi::HttpService>);

#[async_trait::async_trait(?Send)]
impl HttpService for CxxHttpService<'_> {
Expand All @@ -321,11 +442,18 @@ impl HttpService for CxxHttpService<'_> {
url: &'a [u8],
headers: HttpHeadersRef<'a>,
request_body: Pin<&'a mut AsyncInputStream>,
response: Pin<&'a mut HttpServiceResponse>,
response: HttpServiceResponse<'a>,
) -> Result<()> {
// SAFETY: self.0 is a valid owned-or-borrowed HttpService.
let service = unsafe { self.0.as_mut() };
ffi::request(service, method, url, headers.0, request_body, response).await?;
let service = self.0.as_mut();
ffi::request(
service,
method,
url,
headers.0,
request_body,
response.into_ffi(),
)
.await?;
Ok(())
}

Expand All @@ -334,18 +462,24 @@ impl HttpService for CxxHttpService<'_> {
host: &'a [u8],
headers: HttpHeadersRef<'a>,
connection: Pin<&'a mut AsyncIoStream>,
response: Pin<&'a mut ConnectResponse>,
response: ConnectResponse<'a>,
settings: HttpConnectSettings<'a>,
) -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = Result<()>> + 'b>>
where
'a: 'b,
Self: 'b,
{
// SAFETY: self.0 is a valid owned-or-borrowed HttpService.
let service = unsafe { self.0.as_mut() };
let service = self.0.as_mut();
Box::pin(
ffi::connect(service, host, headers.0, connection, response, settings)
.map_err(Into::into),
ffi::connect(
service,
host,
headers.0,
connection,
response.into_ffi(),
settings,
)
.map_err(Into::into),
)
}
}
Expand All @@ -365,8 +499,9 @@ impl DynHttpService {
url: &'a [u8],
headers: &'a ffi::HttpHeaders,
request_body: Pin<&'a mut AsyncInputStream>,
response: Pin<&'a mut HttpServiceResponse>,
response: Pin<&'a mut ffi::HttpServiceResponse>,
) -> Result<()> {
let response = HttpServiceResponse::from(response);
self.0
.request(method, url, HttpHeadersRef(headers), request_body, response)
.await?;
Expand All @@ -378,15 +513,12 @@ impl DynHttpService {
host: &'a [u8],
headers: &'a ffi::HttpHeaders,
connection: Pin<&'a mut AsyncIoStream>,
response: Pin<&'a mut ConnectResponse>,
response: Pin<&'a mut ffi::ConnectResponse>,
settings: HttpConnectSettings<'a>,
) -> impl Future<Output = Result<()>> {
self.0.connect(
host,
HttpHeadersRef(headers),
connection,
response,
settings,
)
let headers = HttpHeadersRef(headers);
let response = ConnectResponse::from(response);
self.0
.connect(host, headers, connection, response, settings)
}
}
Loading
Loading