Skip to content

Commit

Permalink
integrate readablestream with fetch and blob
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Jun 4, 2020
1 parent 0281ace commit bd5796c
Show file tree
Hide file tree
Showing 74 changed files with 2,219 additions and 899 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 85 additions & 12 deletions components/net/http_loader.rs
Expand Up @@ -32,13 +32,17 @@ use http::header::{
use http::{HeaderMap, Request as HyperRequest};
use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use msg::constellation_msg::{HistoryStateId, PipelineId};
use net_traits::pub_domains::reg_suffix;
use net_traits::quality::{quality_to_value, Quality, QualityItem};
use net_traits::request::Origin::Origin as SpecificOrigin;
use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header};
use net_traits::request::{
BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode,
};
use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin};
use net_traits::request::{RedirectMode, Referrer, Request, RequestBuilder, RequestMode};
use net_traits::request::{ResponseTainting, ServiceWorkersMode};
use net_traits::response::{HttpsState, Response, ResponseBody, ResponseType};
use net_traits::{CookieSource, FetchMetadata, NetworkError, ReferrerPolicy};
Expand All @@ -52,11 +56,12 @@ use std::iter::FromIterator;
use std::mem;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::{Condvar, Mutex, RwLock};
use std::sync::{Arc as StdArc, Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Stream};
use tokio::prelude::{future, Future, Sink, Stream};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::channel;

lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
Expand Down Expand Up @@ -400,8 +405,10 @@ fn obtain_response(
client: &Client<Connector, Body>,
url: &ServoUrl,
method: &Method,
headers: &HeaderMap,
data: &Option<Vec<u8>>,
request_headers: &HeaderMap,
body: Option<IpcSender<BodyChunkRequest>>,
request_len: Option<usize>,
load_data_method: &Method,
pipeline_id: &Option<PipelineId>,
request_id: Option<&str>,
is_xhr: bool,
Expand All @@ -412,7 +419,71 @@ fn obtain_response(
Error = NetworkError,
>,
> {
let request_body = data.as_ref().cloned().unwrap_or(vec![]);
let mut headers = request_headers.clone();

let devtools_bytes = StdArc::new(Mutex::new(vec![]));

let request_body = match body {
Some(chunk_requester) => {
// TODO: If body is a stream, append `Transfer-Encoding`/`chunked`,
// see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch

// Step 5.6 of https://fetch.spec.whatwg.org/#concept-http-network-or-cache-fetch
// If source is non-null,
// set contentLengthValue to httpRequest’s body’s total bytes
if let Some(request_len) = request_len {
headers.typed_insert(ContentLength(request_len as u64));
}
let (body_chan, body_port) = ipc::channel().unwrap();

let (sender, receiver) = channel(1);

let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));

// https://fetch.spec.whatwg.org/#concept-request-transmit-body
// Request the first chunk, corresponding to Step 3 and 4.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);

let devtools_bytes = devtools_bytes.clone();

ROUTER.add_route(
body_port.to_opaque(),
Box::new(move |message| {
let bytes: Vec<u8> = message.to().unwrap();
let chunk_requester = chunk_requester.clone();
let sender = sender.clone();

devtools_bytes.lock().unwrap().append(&mut bytes.clone());

HANDLE.lock().unwrap().as_mut().unwrap().spawn(
// Step 5.1.2.2
// Transmit a chunk over the network(and blocking until this is done).
sender
.send(bytes)
.map(move |_| {
// Step 5.1.2.3
// Request the next chunk.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
()
})
.map_err(|_| ()),
);
}),
);

receiver
},
_ => {
if *load_data_method != Method::GET && *load_data_method != Method::HEAD {
headers.typed_insert(ContentLength(0))
}
let (_sender, mut receiver) = channel(1);

receiver.close();

receiver
},
};

context
.timing
Expand Down Expand Up @@ -440,7 +511,7 @@ fn obtain_response(
.replace("{", "%7B")
.replace("}", "%7D"),
)
.body(request_body.clone().into());
.body(Body::wrap_stream(request_body));

// TODO: We currently don't know when the handhhake before the connection is done
// so our best bet would be to set `secure_connection_start` here when we are currently
Expand Down Expand Up @@ -488,7 +559,7 @@ fn obtain_response(
closure_url,
method.clone(),
headers,
Some(request_body.clone()),
Some(devtools_bytes.lock().unwrap().clone()),
pipeline_id,
time::now(),
connect_end - connect_start,
Expand Down Expand Up @@ -804,7 +875,7 @@ pub fn http_redirect_fetch(
.status
.as_ref()
.map_or(true, |s| s.0 != StatusCode::SEE_OTHER) &&
request.body.as_ref().map_or(false, |b| b.is_empty())
request.body.as_ref().map_or(false, |b| b.source_is_null())
{
return Response::network_error(NetworkError::Internal("Request body is not done".into()));
}
Expand Down Expand Up @@ -943,7 +1014,7 @@ fn http_network_or_cache_fetch(
_ => None,
},
// Step 5.6
Some(ref http_request_body) => Some(http_request_body.len() as u64),
Some(ref http_request_body) => http_request_body.len().map(|size| size as u64),
};

// Step 5.7
Expand Down Expand Up @@ -1460,7 +1531,7 @@ impl Drop for ResponseEndTimer {

/// [HTTP network fetch](https://fetch.spec.whatwg.org/#http-network-fetch)
fn http_network_fetch(
request: &Request,
request: &mut Request,
credentials_flag: bool,
done_chan: &mut DoneChannel,
context: &FetchContext,
Expand Down Expand Up @@ -1503,7 +1574,9 @@ fn http_network_fetch(
&url,
&request.method,
&request.headers,
&request.body,
request.body.as_mut().and_then(|body| body.take_stream()),
request.body.as_ref().and_then(|body| body.len()),
&request.method,
&request.pipeline_id,
request_id.as_ref().map(Deref::deref),
is_xhr,
Expand Down
39 changes: 35 additions & 4 deletions components/net/tests/http_loader.rs
Expand Up @@ -24,13 +24,18 @@ use http::uri::Authority;
use http::{Method, StatusCode};
use hyper::body::Body;
use hyper::{Request as HyperRequest, Response as HyperResponse};
use ipc_channel::ipc;
use ipc_channel::router::ROUTER;
use msg::constellation_msg::TEST_PIPELINE_ID;
use net::cookie::Cookie;
use net::cookie_storage::CookieStorage;
use net::http_loader::determine_request_referrer;
use net::resource_thread::AuthCacheEntry;
use net::test::replace_host_table;
use net_traits::request::{CredentialsMode, Destination, RequestBuilder, RequestMode};
use net_traits::request::{
BodyChunkRequest, BodySource, CredentialsMode, Destination, RequestBody, RequestBuilder,
RequestMode,
};
use net_traits::response::{HttpsState, ResponseBody};
use net_traits::{CookieSource, NetworkError, ReferrerPolicy};
use servo_url::{ImmutableOrigin, ServoUrl};
Expand Down Expand Up @@ -94,6 +99,27 @@ pub fn expect_devtools_http_response(
}
}

fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
let content_len = content.len();

let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
ROUTER.add_route(
chunk_request_receiver.to_opaque(),
Box::new(move |message| {
let request = message.to().unwrap();
if let BodyChunkRequest::Connect(sender) = request {
let _ = sender.send(content.clone());
}
}),
);

RequestBody {
stream: Some(chunk_request_sender),
source: BodySource::USVString,
total_bytes: Some(content_len),
}
}

#[test]
fn test_check_default_headers_loaded_in_every_request() {
let expected_headers = Arc::new(Mutex::new(None));
Expand Down Expand Up @@ -276,7 +302,7 @@ fn test_request_and_response_data_with_network_messages() {
url: url,
method: Method::GET,
headers: headers,
body: Some(b"".to_vec()),
body: Some(vec![]),
pipeline_id: TEST_PIPELINE_ID,
startedDateTime: devhttprequest.startedDateTime,
timeStamp: devhttprequest.timeStamp,
Expand Down Expand Up @@ -526,8 +552,11 @@ fn test_load_doesnt_send_request_body_on_any_redirect() {
};
let (pre_server, pre_url) = make_server(pre_handler);

let content = b"Body on POST!";
let request_body = create_request_body_with_content(content.to_vec());

let mut request = RequestBuilder::new(pre_url.clone())
.body(Some(b"Body on POST!".to_vec()))
.body(Some(request_body))
.method(Method::POST)
.destination(Destination::Document)
.origin(mock_origin())
Expand Down Expand Up @@ -819,9 +848,11 @@ fn test_load_sets_content_length_to_length_of_request_body() {
};
let (server, url) = make_server(handler);

let request_body = create_request_body_with_content(content.to_vec());

let mut request = RequestBuilder::new(url.clone())
.method(Method::POST)
.body(Some(content.to_vec()))
.body(Some(request_body))
.destination(Destination::Document)
.origin(mock_origin())
.pipeline_id(Some(TEST_PIPELINE_ID))
Expand Down
58 changes: 55 additions & 3 deletions components/net_traits/request.rs
Expand Up @@ -8,6 +8,7 @@ use crate::ResourceTimingType;
use content_security_policy::{self as csp, CspList};
use http::HeaderMap;
use hyper::Method;
use ipc_channel::ipc::IpcSender;
use mime::Mime;
use msg::constellation_msg::PipelineId;
use servo_url::{ImmutableOrigin, ServoUrl};
Expand Down Expand Up @@ -115,6 +116,57 @@ pub enum ParserMetadata {
NotParserInserted,
}

/// <https://fetch.spec.whatwg.org/#concept-body-source>
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub enum BodySource {
Null,
Blob,
BufferSource,
FormData,
URLSearchParams,
USVString,
}

/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum BodyChunkRequest {
/// Connect a fetch in `net`, with a stream of bytes from `script`.
Connect(IpcSender<Vec<u8>>),
/// Ask for another chunk.
Chunk,
/// Signal the stream is done.
Done,
}

/// The net component's view into <https://fetch.spec.whatwg.org/#bodies>
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct RequestBody {
/// Net's view into a <https://fetch.spec.whatwg.org/#concept-body-stream>
#[ignore_malloc_size_of = "Channels are hard"]
pub stream: Option<IpcSender<BodyChunkRequest>>,
/// <https://fetch.spec.whatwg.org/#concept-body-source>
pub source: BodySource,
/// <https://fetch.spec.whatwg.org/#concept-body-total-bytes>
pub total_bytes: Option<usize>,
}

impl RequestBody {
pub fn take_stream(&mut self) -> Option<IpcSender<BodyChunkRequest>> {
self.stream.take()
}

pub fn source_is_null(&self) -> bool {
if let BodySource::Null = self.source {
return true;
}
false
}

pub fn len(&self) -> Option<usize> {
self.total_bytes.clone()
}
}

#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct RequestBuilder {
#[serde(
Expand All @@ -131,7 +183,7 @@ pub struct RequestBuilder {
#[ignore_malloc_size_of = "Defined in hyper"]
pub headers: HeaderMap,
pub unsafe_request: bool,
pub body: Option<Vec<u8>>,
pub body: Option<RequestBody>,
pub service_workers_mode: ServiceWorkersMode,
// TODO: client object
pub destination: Destination,
Expand Down Expand Up @@ -210,7 +262,7 @@ impl RequestBuilder {
self
}

pub fn body(mut self, body: Option<Vec<u8>>) -> RequestBuilder {
pub fn body(mut self, body: Option<RequestBody>) -> RequestBuilder {
self.body = body;
self
}
Expand Down Expand Up @@ -338,7 +390,7 @@ pub struct Request {
/// <https://fetch.spec.whatwg.org/#unsafe-request-flag>
pub unsafe_request: bool,
/// <https://fetch.spec.whatwg.org/#concept-request-body>
pub body: Option<Vec<u8>>,
pub body: Option<RequestBody>,
// TODO: client object
pub window: Window,
// TODO: target browsing context
Expand Down

0 comments on commit bd5796c

Please sign in to comment.