Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Reqwest instead of custom HTTP handler #510

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 54 additions & 57 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion worker-kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ serde-wasm-bindgen = "0.5.0"
[dev-dependencies]
fs_extra = "1.2.0"
psutil = { git = "https://github.com/mygnu/rust-psutil", branch = "update-dependencies" }
reqwest = { version = "0.11.8", features = ["json"] }
reqwest = { version = "0.12.0", features = ["json"] }
tokio = { version = "1.5.0", features = [
"rt",
"macros",
Expand Down
1 change: 1 addition & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ http.workspace = true
http-body = "1"
matchit = "0.7"
pin-project = "1.1.0"
reqwest = { version = "0.12.0" }
serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1.0.96"
tokio = { version = "1.28", default-features = false }
Expand Down
49 changes: 44 additions & 5 deletions worker/src/global.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,75 @@
use std::ops::Deref;
use std::time::Duration;

use reqwest::{Body, Client, Error, Response};
use tokio::time;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;

use crate::{
request::Request as WorkerRequest, response::Response as WorkerResponse, AbortSignal, Result,
AbortSignal, Result,
};

/// Construct a Fetch call from a URL string or a Request object. Call its `send` method to execute
/// the request.
pub enum Fetch {
Url(url::Url),
Request(WorkerRequest),
Request(Body),
}

impl Fetch {
/// Execute a Fetch call and receive a Response.
pub async fn send(&self) -> Result<WorkerResponse> {
pub async fn send(&self) -> Result<Response> {
match self {
Fetch::Url(url) => fetch_with_str(url.as_ref(), None).await,
Fetch::Request(req) => fetch_with_request(req, None).await,
}
}

/// Execute a Fetch call and receive a Response.
pub async fn send_with_signal(&self, signal: &AbortSignal) -> Result<WorkerResponse> {
pub async fn send_with_signal(&self, signal: &AbortSignal) -> Result<Response> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we want to convert reqwest::Response to http::Response<Body>. I think its not totally straightforward, but possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the purpose of that? Would it help with keeping the data format more flexible? I'm curious.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, right now if they wanted to call fetch and then return the value, it would be the wrong type unless it was converted to http::Response.

match self {
Fetch::Url(url) => fetch_with_str(url.as_ref(), Some(signal)).await,
Fetch::Request(req) => fetch_with_request(req, Some(signal)).await,
}
}
}

//#[cfg(feature = "http")]
async fn fetch_with_str(url: &str, signal: Option<&AbortSignal>) -> Result<Response> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@Kakapio Kakapio Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a bad question, but why does the reqwest repo contain .signal functionality and the Cargo package doesn't? I don't see it in the docs either when it's clearly in the repository.
https://docs.rs/reqwest/latest/reqwest/struct.Client.html

Is there a WIP branch or something of the sort to get at it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is because the Wasm support just hooks up to fetch in the same way we do so, its only possible on Wasm.

https://github.com/seanmonstar/reqwest/blob/master/src/wasm/client.rs#L185

But you see it isn't plumbed through from the client or request object.

Copy link
Contributor Author

@Kakapio Kakapio Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be acceptable to just use Drop and provide a handler? I am not sure if we necessarily need direct access to the signal.

Something like:

// Asynchronous loop to continuously check for cancellation signal
    loop {
        tokio::select! {
            _ = sleep(Duration::from_secs(1)) => {
                println!("Async task running...");
            }
            _ = cancel_receiver.recv() => {
                println!("Cancellation signal received. Dropping task...");
                break; // Break out of the loop when cancellation signal is received
            }
        }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the tradeoffs are here. Using a JavaScript Abort Signal lets the runtime cancel the request rather which I think might be more efficient, but if there is no way to plumb this through it may be ok to go this route.

let client = Client::new();
let request_future = client.get(url).send();
let timeout_future = time::sleep(Duration::from_secs(10)); // TODO: Remove 10 seconds magic value.
tokio::select! {
result = request_future => {
// The request completed successfully
result.map_err(|err| err.into())
},
_ = timeout_future => {
// The timeout occurred before the request completed
Err(reqwest::Error::new(reqwest::StatusCode::REQUEST_TIMEOUT, "Request timed out"))
}
}
}

//#[cfg(feature = "http")]
async fn fetch_with_request(
request: &Response,
signal: Option<&AbortSignal>,
) -> Result<Response> {
let mut init = web_sys::RequestInit::new();
init.signal(signal.map(|x| x.deref()));

let worker: web_sys::WorkerGlobalScope = js_sys::global().unchecked_into();
let req = request.inner();
let promise = worker.fetch_with_request_and_init(req, &init);
let resp = JsFuture::from(promise).await?;
let edge_response: web_sys::Response = resp.dyn_into()?;
Ok(edge_response.into())
}

/*
#[cfg(not(feature = "http"))]
async fn fetch_with_str(url: &str, signal: Option<&AbortSignal>) -> Result<WorkerResponse> {
let mut init = web_sys::RequestInit::new();
init.signal(signal.map(|x| x.deref()));
Expand All @@ -43,6 +81,7 @@ async fn fetch_with_str(url: &str, signal: Option<&AbortSignal>) -> Result<Worke
Ok(resp.into())
}

#[cfg(not(feature = "http"))]
async fn fetch_with_request(
request: &WorkerRequest,
signal: Option<&AbortSignal>,
Expand All @@ -56,4 +95,4 @@ async fn fetch_with_request(
let resp = JsFuture::from(promise).await?;
let edge_response: web_sys::Response = resp.dyn_into()?;
Ok(edge_response.into())
}
}*/