Navigation Menu

Skip to content

Commit

Permalink
partially integrate streaming request bodies with http re-direct
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Jun 4, 2020
1 parent ad4dea7 commit c1b7653
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 44 deletions.
3 changes: 1 addition & 2 deletions components/net/http_loader.rs
Expand Up @@ -973,8 +973,7 @@ fn http_network_or_cache_fetch(
let http_request = if request_has_no_window && request.redirect_mode == RedirectMode::Error {
request
} else {
// Step 5.2
// TODO Implement body source
// Step 5.2.1, .2.2 and .2.3 and 2.4
http_request = request.clone();
&mut http_request
};
Expand Down
6 changes: 1 addition & 5 deletions components/net/tests/http_loader.rs
Expand Up @@ -113,11 +113,7 @@ fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
}),
);

RequestBody {
stream: Some(chunk_request_sender),
source: BodySource::USVString,
total_bytes: Some(content_len),
}
RequestBody::new(chunk_request_sender, BodySource::Object, Some(content_len))
}

#[test]
Expand Down
51 changes: 39 additions & 12 deletions components/net_traits/request.rs
Expand Up @@ -8,7 +8,7 @@ use crate::ResourceTimingType;
use content_security_policy::{self as csp, CspList};
use http::HeaderMap;
use hyper::Method;
use ipc_channel::ipc::IpcSender;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use mime::Mime;
use msg::constellation_msg::PipelineId;
use servo_url::{ImmutableOrigin, ServoUrl};
Expand Down Expand Up @@ -120,18 +120,16 @@ pub enum ParserMetadata {
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub enum BodySource {
Null,
Blob,
BufferSource,
FormData,
URLSearchParams,
USVString,
Object,
}

/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub enum BodyChunkRequest {
/// Connect a fetch in `net`, with a stream of bytes from `script`.
Connect(IpcSender<Vec<u8>>),
/// Re-extract a new stream from the source, following a redirect.
Extract(IpcReceiver<BodyChunkRequest>),
/// Ask for another chunk.
Chunk,
/// Signal the stream is done.
Expand All @@ -141,18 +139,47 @@ pub enum BodyChunkRequest {
/// The net component's view into <https://fetch.spec.whatwg.org/#bodies>
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct RequestBody {
/// Net's view into a <https://fetch.spec.whatwg.org/#concept-body-stream>
/// Net's channel to communicate with script re this body.
#[ignore_malloc_size_of = "Channels are hard"]
pub stream: Option<IpcSender<BodyChunkRequest>>,
chan: IpcSender<BodyChunkRequest>,
/// Has the stream been read from already?
read_from: bool,
/// <https://fetch.spec.whatwg.org/#concept-body-source>
pub source: BodySource,
source: BodySource,
/// <https://fetch.spec.whatwg.org/#concept-body-total-bytes>
pub total_bytes: Option<usize>,
total_bytes: Option<usize>,
}

impl RequestBody {
pub fn new(
chan: IpcSender<BodyChunkRequest>,
source: BodySource,
total_bytes: Option<usize>,
) -> Self {
RequestBody {
chan,
source,
total_bytes,
read_from: false,
}
}

pub fn take_stream(&mut self) -> Option<IpcSender<BodyChunkRequest>> {
self.stream.take()
if self.read_from {
match self.source {
BodySource::Null => panic!(
"Null sources should never be read more than once(no re-direct allowed)."
),
BodySource::Object => {
let (chan, port) = ipc::channel().unwrap();
let _ = self.chan.send(BodyChunkRequest::Extract(port));
self.chan = chan.clone();
return Some(chan);
},
}
}
self.read_from = true;
Some(self.chan.clone())
}

pub fn source_is_null(&self) -> bool {
Expand Down
103 changes: 85 additions & 18 deletions components/script/body.rs
Expand Up @@ -27,7 +27,7 @@ use crate::task_source::networking::NetworkingTaskSource;
use crate::task_source::TaskSource;
use crate::task_source::TaskSourceName;
use encoding_rs::UTF_8;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use js::jsapi::Heap;
use js::jsapi::JSObject;
Expand All @@ -40,25 +40,39 @@ use js::rust::wrappers::JS_ParseJSON;
use js::rust::HandleValue;
use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime};
use net_traits::request::{BodyChunkRequest, BodySource, RequestBody};
use net_traits::request::{BodyChunkRequest, BodySource as NetBodySource, RequestBody};
use script_traits::serializable::BlobImpl;
use std::ptr;
use std::rc::Rc;
use std::str;
use url::form_urlencoded;

/// The Dom object, or ReadableStream, that is the source of a body.
/// <https://fetch.spec.whatwg.org/#concept-body-source>
#[derive(Clone)]
pub enum BodySource {
/// A ReadableStream comes with a null-source.
Null,
/// Another Dom object as source,
/// TODO: store the actual object
/// and re-exctact a stream on re-direct.
Object,
}

/// The IPC route handler
/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
/// This route runs in the script process,
/// and will queue tasks to perform operations
/// on the stream and transmit body chunks over IPC.
#[derive(Clone)]
struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
canceller: TaskCanceller,
bytes_sender: Option<IpcSender<Vec<u8>>>,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
source: BodySource,
}

impl TransmitBodyConnectHandler {
Expand All @@ -68,6 +82,7 @@ impl TransmitBodyConnectHandler {
canceller: TaskCanceller,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
source: BodySource,
) -> TransmitBodyConnectHandler {
TransmitBodyConnectHandler {
stream: stream,
Expand All @@ -76,23 +91,70 @@ impl TransmitBodyConnectHandler {
bytes_sender: None,
control_sender,
in_memory,
source,
}
}

/// Re-extract the source to support streaming it again for a re-direct.
/// TODO: actually re-extract the source, instead of just cloning data, to support Blob.
fn re_extract(&self, chunk_request_receiver: IpcReceiver<BodyChunkRequest>) {
let mut body_handler = self.clone();

ROUTER.add_route(
chunk_request_receiver.to_opaque(),
Box::new(move |message| {
let request = message.to().unwrap();
match request {
BodyChunkRequest::Connect(sender) => {
body_handler.start_reading(sender);
},
BodyChunkRequest::Extract(receiver) => {
body_handler.re_extract(receiver);
},
BodyChunkRequest::Chunk => body_handler.transmit_source(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
BodyChunkRequest::Done => body_handler.stop_reading(),
}
}),
);
}

/// In case of re-direct, and of a source available in memory,
/// send it all in one chunk.
///
/// TODO: this method should be deprecated
/// in favor of making `re_extract` actually re-extract a stream from the source.
/// See #26686
fn transmit_source(&self) {
if let BodySource::Null = self.source {
panic!("ReadableStream(Null) sources should not re-direct.");
}
if let Some(bytes) = self.in_memory.clone() {
let _ = self
.bytes_sender
.as_ref()
.expect("No bytes sender to transmit source.")
.send(bytes.clone());
return;
}
warn!("Re-directs for file-based Blobs not supported yet.");
}

/// Take the IPC sender sent by `net`, so we can send body chunks with it.
pub fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) {
fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) {
self.bytes_sender = Some(sender);
}

/// Drop the IPC sender sent by `net`
pub fn stop_reading(&mut self) {
fn stop_reading(&mut self) {
// Note: this should close the corresponding receiver,
// and terminate the request stream in `net`.
self.bytes_sender = None;
}

/// The entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
pub fn transmit_body_chunk(&mut self) {
fn transmit_body_chunk(&mut self) {
let stream = self.stream.clone();
let control_sender = self.control_sender.clone();
let bytes_sender = self
Expand All @@ -101,7 +163,7 @@ impl TransmitBodyConnectHandler {
.expect("No bytes sender to transmit chunk.");

// In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
if let Some(bytes) = self.in_memory.take() {
if let Some(bytes) = self.in_memory.clone() {
let _ = bytes_sender.send(bytes);
return;
}
Expand Down Expand Up @@ -259,12 +321,18 @@ impl ExtractedBody {
// In case of the data being in-memory, send everything in one chunk, by-passing SM.
let in_memory = stream.get_in_memory_bytes();

let net_source = match source {
BodySource::Null => NetBodySource::Null,
_ => NetBodySource::Object,
};

let mut body_handler = TransmitBodyConnectHandler::new(
trusted_stream,
task_source,
canceller,
chunk_request_sender.clone(),
in_memory,
source,
);

ROUTER.add_route(
Expand All @@ -275,6 +343,9 @@ impl ExtractedBody {
BodyChunkRequest::Connect(sender) => {
body_handler.start_reading(sender);
},
BodyChunkRequest::Extract(receiver) => {
body_handler.re_extract(receiver);
},
BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
Expand All @@ -285,11 +356,7 @@ impl ExtractedBody {

// Return `components::net` view into this request body,
// which can be used by `net` to transmit it over the network.
let request_body = RequestBody {
stream: Some(chunk_request_sender),
source,
total_bytes,
};
let request_body = RequestBody::new(chunk_request_sender, net_source, total_bytes);

// Also return the stream for this body, which can be used by script to consume it.
(request_body, stream)
Expand Down Expand Up @@ -322,7 +389,7 @@ impl Extractable for BodyInit {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
source: BodySource::Object,
})
},
BodyInit::ArrayBufferView(ref typedarray) => {
Expand All @@ -333,7 +400,7 @@ impl Extractable for BodyInit {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
source: BodySource::Object,
})
},
BodyInit::ReadableStream(stream) => {
Expand Down Expand Up @@ -367,7 +434,7 @@ impl Extractable for Vec<u8> {
total_bytes: Some(total_bytes),
content_type: None,
// A vec is used only in `submit_entity_body`.
source: BodySource::FormData,
source: BodySource::Object,
})
}
}
Expand All @@ -385,7 +452,7 @@ impl Extractable for Blob {
stream: self.get_stream(),
total_bytes: Some(total_bytes),
content_type,
source: BodySource::Blob,
source: BodySource::Object,
})
}
}
Expand All @@ -400,7 +467,7 @@ impl Extractable for DOMString {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::USVString,
source: BodySource::Object,
})
}
}
Expand All @@ -419,7 +486,7 @@ impl Extractable for FormData {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::FormData,
source: BodySource::Object,
})
}
}
Expand All @@ -436,7 +503,7 @@ impl Extractable for URLSearchParams {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::URLSearchParams,
source: BodySource::Object,
})
}
}
Expand Down

0 comments on commit c1b7653

Please sign in to comment.