Skip to content

Commit

Permalink
fix(workers): importScripts concurrently and use a new `reqwest::Cl…
Browse files Browse the repository at this point in the history
…ient` per importScripts (#23699)

1. We were polling each future in sequence, so this meant it was
fetching scripts in sequence.
2. It's not safe to share `reqwest::Client` across tokio runtimes
(seanmonstar/reqwest#1148 (comment))
  • Loading branch information
dsherret committed May 5, 2024
1 parent 4b0f22e commit d527b63
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 107 deletions.
38 changes: 22 additions & 16 deletions ext/fetch/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,33 @@ pub fn get_or_create_client_from_state(
Ok(client.clone())
} else {
let options = state.borrow::<Options>();
let client = create_http_client(
&options.user_agent,
CreateHttpClientOptions {
root_cert_store: options.root_cert_store()?,
ca_certs: vec![],
proxy: options.proxy.clone(),
unsafely_ignore_certificate_errors: options
.unsafely_ignore_certificate_errors
.clone(),
client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
pool_max_idle_per_host: None,
pool_idle_timeout: None,
http1: true,
http2: true,
},
)?;
let client = create_client_from_options(options)?;
state.put::<reqwest::Client>(client.clone());
Ok(client)
}
}

pub fn create_client_from_options(
options: &Options,
) -> Result<reqwest::Client, AnyError> {
create_http_client(
&options.user_agent,
CreateHttpClientOptions {
root_cert_store: options.root_cert_store()?,
ca_certs: vec![],
proxy: options.proxy.clone(),
unsafely_ignore_certificate_errors: options
.unsafely_ignore_certificate_errors
.clone(),
client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
pool_max_idle_per_host: None,
pool_idle_timeout: None,
http1: true,
http2: true,
},
)
}

#[allow(clippy::type_complexity)]
pub struct ResourceToBodyAdapter(
Rc<dyn Resource>,
Expand Down
187 changes: 96 additions & 91 deletions runtime/ops/web_worker/sync_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::web_worker::WebWorkerInternalHandle;
use crate::web_worker::WebWorkerType;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::url::Url;
use deno_core::OpState;
Expand All @@ -15,7 +16,6 @@ use deno_websocket::DomExceptionNetworkError;
use hyper::body::Bytes;
use serde::Deserialize;
use serde::Serialize;
use tokio::task::JoinHandle;

// TODO(andreubotella) Properly parse the MIME type
fn mime_type_essence(mime_type: &str) -> String {
Expand All @@ -38,12 +38,15 @@ pub struct SyncFetchScript {
pub fn op_worker_sync_fetch(
state: &mut OpState,
#[serde] scripts: Vec<String>,
mut loose_mime_checks: bool,
loose_mime_checks: bool,
) -> Result<Vec<SyncFetchScript>, AnyError> {
let handle = state.borrow::<WebWorkerInternalHandle>().clone();
assert_eq!(handle.worker_type, WebWorkerType::Classic);

let client = deno_fetch::get_or_create_client_from_state(state)?;
// it's not safe to share a client across tokio runtimes, so create a fresh one
// https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788
let options = state.borrow::<deno_fetch::Options>().clone();
let client = deno_fetch::create_client_from_options(&options)?;

// TODO(andreubotella) It's not good to throw an exception related to blob
// URLs when none of the script URLs use the blob scheme.
Expand All @@ -62,107 +65,109 @@ pub fn op_worker_sync_fetch(
.enable_time()
.build()?;

let handles: Vec<_> = scripts
.into_iter()
.map(|script| -> JoinHandle<Result<SyncFetchScript, AnyError>> {
let client = client.clone();
let blob_store = blob_store.clone();
runtime.spawn(async move {
let script_url = Url::parse(&script)
.map_err(|_| type_error("Invalid script URL"))?;

let (body, mime_type, res_url) = match script_url.scheme() {
"http" | "https" => {
let resp =
client.get(script_url).send().await?.error_for_status()?;

let res_url = resp.url().to_string();

// TODO(andreubotella) Properly run fetch's "extract a MIME type".
let mime_type = resp
.headers()
.get("Content-Type")
.and_then(|v| v.to_str().ok())
.map(mime_type_essence);

// Always check the MIME type with HTTP(S).
loose_mime_checks = false;

let body = resp.bytes().await?;

(body, mime_type, res_url)
}
"data" => {
let data_url = DataUrl::process(&script)
.map_err(|e| type_error(format!("{e:?}")))?;
runtime.block_on(async move {
let mut futures = scripts
.into_iter()
.map(|script| {
let client = client.clone();
let blob_store = blob_store.clone();
deno_core::unsync::spawn(async move {
let script_url = Url::parse(&script)
.map_err(|_| type_error("Invalid script URL"))?;
let mut loose_mime_checks = loose_mime_checks;

let (body, mime_type, res_url) = match script_url.scheme() {
"http" | "https" => {
let resp =
client.get(script_url).send().await?.error_for_status()?;

let res_url = resp.url().to_string();

// TODO(andreubotella) Properly run fetch's "extract a MIME type".
let mime_type = resp
.headers()
.get("Content-Type")
.and_then(|v| v.to_str().ok())
.map(mime_type_essence);

// Always check the MIME type with HTTP(S).
loose_mime_checks = false;

let body = resp.bytes().await?;

(body, mime_type, res_url)
}
"data" => {
let data_url = DataUrl::process(&script)
.map_err(|e| type_error(format!("{e:?}")))?;

let mime_type = {
let mime = data_url.mime_type();
format!("{}/{}", mime.type_, mime.subtype)
};
let mime_type = {
let mime = data_url.mime_type();
format!("{}/{}", mime.type_, mime.subtype)
};

let (body, _) = data_url
.decode_to_vec()
.map_err(|e| type_error(format!("{e:?}")))?;
let (body, _) = data_url
.decode_to_vec()
.map_err(|e| type_error(format!("{e:?}")))?;

(Bytes::from(body), Some(mime_type), script)
}
"blob" => {
let blob =
blob_store.get_object_url(script_url).ok_or_else(|| {
type_error("Blob for the given URL not found.")
})?;
(Bytes::from(body), Some(mime_type), script)
}
"blob" => {
let blob =
blob_store.get_object_url(script_url).ok_or_else(|| {
type_error("Blob for the given URL not found.")
})?;

let mime_type = mime_type_essence(&blob.media_type);
let mime_type = mime_type_essence(&blob.media_type);

let body = blob.read_all().await?;
let body = blob.read_all().await?;

(Bytes::from(body), Some(mime_type), script)
}
_ => {
return Err(type_error(format!(
"Classic scripts with scheme {}: are not supported in workers.",
script_url.scheme()
)))
}
};

if !loose_mime_checks {
// TODO(andreubotella) Check properly for a Javascript MIME type.
match mime_type.as_deref() {
Some("application/javascript" | "text/javascript") => {}
Some(mime_type) => {
return Err(
DomExceptionNetworkError {
msg: format!("Invalid MIME type {mime_type:?}."),
}
.into(),
)
(Bytes::from(body), Some(mime_type), script)
}
_ => {
return Err(type_error(format!(
"Classic scripts with scheme {}: are not supported in workers.",
script_url.scheme()
)))
}
None => {
return Err(
DomExceptionNetworkError::new("Missing MIME type.").into(),
)
};

if !loose_mime_checks {
// TODO(andreubotella) Check properly for a Javascript MIME type.
match mime_type.as_deref() {
Some("application/javascript" | "text/javascript") => {}
Some(mime_type) => {
return Err(
DomExceptionNetworkError {
msg: format!("Invalid MIME type {mime_type:?}."),
}
.into(),
)
}
None => {
return Err(
DomExceptionNetworkError::new("Missing MIME type.").into(),
)
}
}
}
}

let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body);
let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body);

Ok(SyncFetchScript {
url: res_url,
script: text.into_owned(),
Ok(SyncFetchScript {
url: res_url,
script: text.into_owned(),
})
})
})
})
.collect();

let mut ret = Vec::with_capacity(handles.len());
for handle in handles {
let script = runtime.block_on(handle)??;
ret.push(script);
}
Ok(ret)
.collect::<deno_core::futures::stream::FuturesUnordered<_>>();
let mut ret = Vec::with_capacity(futures.len());
while let Some(result) = futures.next().await {
let script = result??;
ret.push(script);
}
Ok(ret)
})
});

thread.join().unwrap()
Expand Down

0 comments on commit d527b63

Please sign in to comment.