Skip to content

Commit

Permalink
Split media fetch context and fetch listener to prevent deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
ferjm committed Jan 11, 2019
1 parent da32c84 commit 9a18074
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 99 deletions.
2 changes: 1 addition & 1 deletion components/script/dom/bindings/trace.rs
Expand Up @@ -488,7 +488,7 @@ unsafe_no_jsmanaged_fields!(Mutex<MediaFrameRenderer>);
unsafe_no_jsmanaged_fields!(RenderApiSender);
unsafe_no_jsmanaged_fields!(ResourceFetchTiming);
unsafe_no_jsmanaged_fields!(Timespec);
unsafe_no_jsmanaged_fields!(Mutex<HTMLMediaElementFetchContext>);
unsafe_no_jsmanaged_fields!(HTMLMediaElementFetchContext);

unsafe impl<'a> JSTraceable for &'a str {
#[inline]
Expand Down
214 changes: 116 additions & 98 deletions components/script/dom/htmlmediaelement.rs
Expand Up @@ -204,8 +204,7 @@ pub struct HTMLMediaElement {
#[ignore_malloc_size_of = "Defined in time"]
next_timeupdate_event: Cell<Timespec>,
/// Latest fetch request context.
#[ignore_malloc_size_of = "Arc"]
current_fetch_request: DomRefCell<Option<Arc<Mutex<HTMLMediaElementFetchContext>>>>,
current_fetch_context: DomRefCell<Option<HTMLMediaElementFetchContext>>,
}

/// <https://html.spec.whatwg.org/multipage/#dom-media-networkstate>
Expand Down Expand Up @@ -262,7 +261,7 @@ impl HTMLMediaElement {
played: Rc::new(DomRefCell::new(TimeRangesContainer::new())),
text_tracks_list: Default::default(),
next_timeupdate_event: Cell::new(time::get_time() + Duration::milliseconds(250)),
current_fetch_request: DomRefCell::new(None),
current_fetch_context: DomRefCell::new(None),
}
}

Expand Down Expand Up @@ -665,34 +664,31 @@ impl HTMLMediaElement {
..RequestInit::default()
};

let mut current_fetch_request = self.current_fetch_request.borrow_mut();
if let Some(ref current_fetch_request) = *current_fetch_request {
current_fetch_request
.lock()
.unwrap()
.cancel(CancelReason::Overridden);
let mut current_fetch_context = self.current_fetch_context.borrow_mut();
if let Some(ref mut current_fetch_context) = *current_fetch_context {
current_fetch_context.cancel(CancelReason::Overridden);
}
let (context, cancel_receiver) = HTMLMediaElementFetchContext::new(
let (fetch_context, cancel_receiver) = HTMLMediaElementFetchContext::new();
*current_fetch_context = Some(fetch_context);
let fetch_listener = Arc::new(Mutex::new(HTMLMediaElementFetchListener::new(
self,
self.resource_url.borrow().as_ref().unwrap().clone(),
offset.unwrap_or(0),
);
let context = Arc::new(Mutex::new(context));
*current_fetch_request = Some(context.clone());
)));
let (action_sender, action_receiver) = ipc::channel().unwrap();
let window = window_from_node(self);
let (task_source, canceller) = window
.task_manager()
.networking_task_source_with_canceller();
let listener = NetworkListener {
context,
let network_listener = NetworkListener {
context: fetch_listener,
task_source,
canceller: Some(canceller),
};
ROUTER.add_route(
action_receiver.to_opaque(),
Box::new(move |message| {
listener.notify_fetch(message.to().unwrap());
network_listener.notify_fetch(message.to().unwrap());
}),
);
let global = self.global();
Expand Down Expand Up @@ -887,11 +883,8 @@ impl HTMLMediaElement {
task_source.queue_simple_event(self.upcast(), atom!("emptied"), &window);

// Step 6.2.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() {
current_fetch_request
.lock()
.unwrap()
.cancel(CancelReason::Error);
if let Some(ref mut current_fetch_context) = *self.current_fetch_context.borrow_mut() {
current_fetch_context.cancel(CancelReason::Error);
}

// Step 6.3.
Expand Down Expand Up @@ -1045,8 +1038,8 @@ impl HTMLMediaElement {
// XXX(ferjm) seekable attribute: we need to get the information about
// what's been decoded and buffered so far from servo-media
// and add the seekable attribute as a TimeRange.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() {
if !current_fetch_request.lock().unwrap().is_seekable() {
if let Some(ref current_fetch_context) = *self.current_fetch_context.borrow() {
if !current_fetch_context.is_seekable() {
self.seeking.set(false);
return;
}
Expand Down Expand Up @@ -1208,9 +1201,8 @@ impl HTMLMediaElement {
// Otherwise, if we have no request and the previous request was
// cancelled because we got an EnoughData event, we restart
// fetching where we left.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() {
let current_fetch_request = current_fetch_request.lock().unwrap();
match current_fetch_request.cancel_reason() {
if let Some(ref current_fetch_context) = *self.current_fetch_context.borrow() {
match current_fetch_context.cancel_reason() {
Some(ref reason) if *reason == CancelReason::Backoff => {
// XXX(ferjm) Ideally we should just create a fetch request from
// where we left. But keeping track of the exact next byte that the
Expand All @@ -1228,10 +1220,11 @@ impl HTMLMediaElement {
// bytes, so we cancel the ongoing fetch request iff we are able
// to restart it from where we left. Otherwise, we continue the
// current fetch request, assuming that some frames will be dropped.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() {
let mut current_fetch_request = current_fetch_request.lock().unwrap();
if current_fetch_request.is_seekable() {
current_fetch_request.cancel(CancelReason::Backoff);
if let Some(ref mut current_fetch_context) =
*self.current_fetch_context.borrow_mut()
{
if current_fetch_context.is_seekable() {
current_fetch_context.cancel(CancelReason::Backoff);
}
}
},
Expand Down Expand Up @@ -1685,7 +1678,7 @@ enum Resource {
}

/// Indicates the reason why a fetch request was cancelled.
#[derive(Debug, PartialEq)]
#[derive(Debug, MallocSizeOf, PartialEq)]
enum CancelReason {
/// We were asked to stop pushing data to the player.
Backoff,
Expand All @@ -1695,7 +1688,53 @@ enum CancelReason {
Overridden,
}

#[derive(MallocSizeOf)]
pub struct HTMLMediaElementFetchContext {
/// Some if the request has been cancelled.
cancel_reason: Option<CancelReason>,
/// Indicates whether the fetched stream is seekable.
is_seekable: bool,
/// Fetch canceller. Allows cancelling the current fetch request by
/// manually calling its .cancel() method or automatically on Drop.
fetch_canceller: FetchCanceller,
}

impl HTMLMediaElementFetchContext {
fn new() -> (HTMLMediaElementFetchContext, ipc::IpcReceiver<()>) {
let mut fetch_canceller = FetchCanceller::new();
let cancel_receiver = fetch_canceller.initialize();
(
HTMLMediaElementFetchContext {
cancel_reason: None,
is_seekable: false,
fetch_canceller,
},
cancel_receiver,
)
}

fn is_seekable(&self) -> bool {
self.is_seekable
}

fn set_seekable(&mut self, seekable: bool) {
self.is_seekable = seekable;
}

fn cancel(&mut self, reason: CancelReason) {
if self.cancel_reason.is_some() {
return;
}
self.cancel_reason = Some(reason);
self.fetch_canceller.cancel();
}

fn cancel_reason(&self) -> &Option<CancelReason> {
&self.cancel_reason
}
}

struct HTMLMediaElementFetchListener {
/// The element that initiated the request.
elem: Trusted<HTMLMediaElement>,
/// The response metadata received to date.
Expand All @@ -1704,8 +1743,6 @@ pub struct HTMLMediaElementFetchContext {
generation_id: u32,
/// Time of last progress notification.
next_progress_event: Timespec,
/// Some if the request has been cancelled.
cancel_reason: Option<CancelReason>,
/// Timing data for this resource.
resource_timing: ResourceFetchTiming,
/// Url for the resource.
Expand All @@ -1718,15 +1755,10 @@ pub struct HTMLMediaElementFetchContext {
/// EnoughData event uses this value to restart the download from
/// the last fetched position.
latest_fetched_content: u64,
/// Indicates whether the stream is seekable.
is_seekable: bool,
/// Fetch canceller. Allows cancelling the current fetch request by
/// manually calling its .cancel() method or automatically on Drop.
fetch_canceller: FetchCanceller,
}

// https://html.spec.whatwg.org/multipage/#media-data-processing-steps-list
impl FetchResponseListener for HTMLMediaElementFetchContext {
impl FetchResponseListener for HTMLMediaElementFetchListener {
fn process_request_body(&mut self) {}

fn process_request_eof(&mut self) {}
Expand Down Expand Up @@ -1780,7 +1812,9 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {

if is_seekable {
// The server supports range requests,
self.is_seekable = true;
if let Some(ref mut current_fetch_context) = *elem.current_fetch_context.borrow_mut() {
current_fetch_context.set_seekable(true);
}
// and we can safely set the type of stream to Seekable.
if let Err(e) = elem.player.set_stream_type(StreamType::Seekable) {
warn!("Could not set stream type to Seekable. {:?}", e);
Expand All @@ -1791,18 +1825,25 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
if !status_is_ok {
// Ensure that the element doesn't receive any further notifications
// of the aborted fetch.
self.cancel(CancelReason::Error);
if let Some(ref mut current_fetch_context) = *elem.current_fetch_context.borrow_mut() {
current_fetch_context.cancel(CancelReason::Error);
}
elem.queue_dedicated_media_source_failure_steps();
}
}

fn process_response_chunk(&mut self, payload: Vec<u8>) {
let elem = self.elem.root();
if self.cancel_reason.is_some() || elem.generation_id.get() != self.generation_id {
// An error was received previously or we triggered a new fetch request,
// skip processing the payload.
// If an error was received previously or if we triggered a new fetch request,
// we skip processing the payload.
if elem.generation_id.get() != self.generation_id {
return;
}
if let Some(ref current_fetch_context) = *elem.current_fetch_context.borrow() {
if current_fetch_context.cancel_reason().is_some() {
return;
}
}

let payload_len = payload.len() as u64;

Expand All @@ -1813,7 +1854,13 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
// the current request. Otherwise, we continue the request
// assuming that we may drop some frames.
match e {
PlayerError::EnoughData => self.cancel(CancelReason::Backoff),
PlayerError::EnoughData => {
if let Some(ref mut current_fetch_context) =
*elem.current_fetch_context.borrow_mut()
{
current_fetch_context.cancel(CancelReason::Backoff);
}
},
_ => (),
}
warn!("Could not push input data to player {:?}", e);
Expand All @@ -1837,15 +1884,17 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
// https://html.spec.whatwg.org/multipage/#media-data-processing-steps-list
fn process_response_eof(&mut self, status: Result<ResourceFetchTiming, NetworkError>) {
let elem = self.elem.root();
// If an error was previously received and no new fetch request was triggered,
// we skip processing the payload and notify the media backend that we are done
// pushing data.
if elem.generation_id.get() == self.generation_id {
if let Some(CancelReason::Error) = self.cancel_reason {
// An error was received previously and no new fetch request was triggered, so
// we skip processing the payload and notify the media backend that we are done
// pushing data.
if let Err(e) = elem.player.end_of_stream() {
warn!("Could not signal EOS to player {:?}", e);
if let Some(ref current_fetch_context) = *elem.current_fetch_context.borrow() {
if let Some(CancelReason::Error) = current_fetch_context.cancel_reason() {
if let Err(e) = elem.player.end_of_stream() {
warn!("Could not signal EOS to player {:?}", e);
}
return;
}
return;
}
}

Expand All @@ -1868,11 +1917,8 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
// => "If the connection is interrupted after some media data has been received..."
else if elem.ready_state.get() != ReadyState::HaveNothing {
// Step 1
if let Some(ref current_fetch_request) = *elem.current_fetch_request.borrow() {
current_fetch_request
.lock()
.unwrap()
.cancel(CancelReason::Error);
if let Some(ref mut current_fetch_context) = *elem.current_fetch_context.borrow_mut() {
current_fetch_context.cancel(CancelReason::Error);
}

// Step 2
Expand Down Expand Up @@ -1908,7 +1954,7 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
}
}

impl ResourceTimingListener for HTMLMediaElementFetchContext {
impl ResourceTimingListener for HTMLMediaElementFetchListener {
fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
let initiator_type = InitiatorType::LocalName(
self.elem
Expand All @@ -1925,52 +1971,24 @@ impl ResourceTimingListener for HTMLMediaElementFetchContext {
}
}

impl PreInvoke for HTMLMediaElementFetchContext {
impl PreInvoke for HTMLMediaElementFetchListener {
fn should_invoke(&self) -> bool {
//TODO: finish_load needs to run at some point if the generation changes.
self.elem.root().generation_id.get() == self.generation_id
}
}

impl HTMLMediaElementFetchContext {
fn new(
elem: &HTMLMediaElement,
url: ServoUrl,
offset: u64,
) -> (HTMLMediaElementFetchContext, ipc::IpcReceiver<()>) {
let mut fetch_canceller = FetchCanceller::new();
let cancel_receiver = fetch_canceller.initialize();
(
HTMLMediaElementFetchContext {
elem: Trusted::new(elem),
metadata: None,
generation_id: elem.generation_id.get(),
next_progress_event: time::get_time() + Duration::milliseconds(350),
cancel_reason: None,
resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource),
url,
expected_content_length: None,
latest_fetched_content: offset,
is_seekable: false,
fetch_canceller,
},
cancel_receiver,
)
}

fn is_seekable(&self) -> bool {
self.is_seekable
}

fn cancel(&mut self, reason: CancelReason) {
if self.cancel_reason.is_some() {
return;
impl HTMLMediaElementFetchListener {
fn new(elem: &HTMLMediaElement, url: ServoUrl, offset: u64) -> Self {
Self {
elem: Trusted::new(elem),
metadata: None,
generation_id: elem.generation_id.get(),
next_progress_event: time::get_time() + Duration::milliseconds(350),
resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource),
url,
expected_content_length: None,
latest_fetched_content: offset,
}
self.cancel_reason = Some(reason);
self.fetch_canceller.cancel();
}

fn cancel_reason(&self) -> &Option<CancelReason> {
&self.cancel_reason
}
}

0 comments on commit 9a18074

Please sign in to comment.