diff --git a/components/constellation/constellation.rs b/components/constellation/constellation.rs index 356f530d9ede..b243b95c9a73 100644 --- a/components/constellation/constellation.rs +++ b/components/constellation/constellation.rs @@ -186,6 +186,18 @@ enum TransferState { /// The port is currently in-transfer, /// and incoming tasks should be buffered until it becomes managed again. TransferInProgress(VecDeque), + /// A global has requested the transfer to be completed, + /// it's pending a confirmation of either failure or success to complete the transfer. + CompletionInProgress(MessagePortRouterId), + /// While a completion of a transfer was in progress, the port was shipped, + /// hence the transfer failed to complete. + /// We start buffering incoming messages, + /// while awaiting the return of the previous buffer from the global + /// that failed to complete the transfer. + CompletionFailed(VecDeque), + /// While a completion failed, another global requested to complete the transfer. + /// We are still buffering messages, and awaiting the return of the buffer from the global who failed. + CompletionRequested(MessagePortRouterId, VecDeque), /// The entangled port has been removed while the port was in-transfer, /// the current port should be removed as well once it is managed again. EntangledRemoved, @@ -1553,6 +1565,13 @@ where }; match content { + FromScriptMsg::CompleteMessagePortTransfer(router_id, ports) => { + self.handle_complete_message_port_transfer(router_id, ports); + }, + FromScriptMsg::MessagePortTransferResult(router_id, succeeded, failed) => { + self.handle_message_port_transfer_completed(router_id, succeeded); + self.handle_message_port_transfer_failed(failed); + }, FromScriptMsg::RerouteMessagePort(port_id, task) => { self.handle_reroute_messageport(port_id, task); }, @@ -1779,6 +1798,224 @@ where } } + fn handle_message_port_transfer_completed( + &mut self, + router_id: Option, + ports: Vec, + ) { + let router_id = match router_id { + Some(router_id) => router_id, + None => { + if !ports.is_empty() { + warn!("Constellation unable to process port transfer successes, since no router id was received"); + } + return; + }, + }; + for port_id in ports.into_iter() { + let mut entry = match self.message_ports.entry(port_id) { + Entry::Vacant(_) => { + warn!( + "Constellation received a port transfer completed msg for unknown messageport {:?}", + port_id + ); + continue; + }, + Entry::Occupied(entry) => entry, + }; + match entry.get().state { + TransferState::EntangledRemoved => { + // If the entangled port has been removed while this one was in-transfer, + // remove it now. + if let Some(sender) = self.message_port_routers.get(&router_id) { + let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id)); + } else { + warn!("No message-port sender for {:?}", router_id); + } + entry.remove_entry(); + continue; + }, + TransferState::CompletionInProgress(expected_router_id) => { + // Here, the transfer was normally completed. + + if expected_router_id != router_id { + return warn!( + "Transfer completed by an unexpected router: {:?}", + router_id + ); + } + // Update the state to managed. + let new_info = MessagePortInfo { + state: TransferState::Managed(router_id), + entangled_with: entry.get().entangled_with, + }; + entry.insert(new_info); + }, + _ => warn!("Constellation received unexpected port transfer completed message"), + } + } + } + + fn handle_message_port_transfer_failed( + &mut self, + ports: HashMap>, + ) { + for (port_id, mut previous_buffer) in ports.into_iter() { + let entry = match self.message_ports.remove(&port_id) { + None => { + warn!( + "Constellation received a port transfer completed msg for unknown messageport {:?}", + port_id + ); + continue; + }, + Some(entry) => entry, + }; + let new_info = match entry.state { + TransferState::EntangledRemoved => { + // If the entangled port has been removed while this one was in-transfer, + // just drop it. + continue; + }, + TransferState::CompletionFailed(mut current_buffer) => { + // The transfer failed, + // and now the global has returned us the buffer we previously sent. + // So the next update is back to a "normal" transfer in progress. + + // Tasks in the previous buffer are older, + // hence need to be added to the front of the current one. + while let Some(task) = previous_buffer.pop_back() { + current_buffer.push_front(task); + } + // Update the state to transfer-in-progress. + MessagePortInfo { + state: TransferState::TransferInProgress(current_buffer), + entangled_with: entry.entangled_with, + } + }, + TransferState::CompletionRequested(target_router_id, mut current_buffer) => { + // Here, before the global who failed the last transfer could return us the buffer, + // another global already sent us a request to complete a new transfer. + // So we use the returned buffer to update + // the current-buffer(of new incoming messages), + // and we send everything to the global + // who is waiting for completion of the current transfer. + + // Tasks in the previous buffer are older, + // hence need to be added to the front of the current one. + while let Some(task) = previous_buffer.pop_back() { + current_buffer.push_front(task); + } + // Forward the buffered message-queue to complete the current transfer. + if let Some(sender) = self.message_port_routers.get(&target_router_id) { + if sender + .send(MessagePortMsg::CompletePendingTransfer( + port_id, + current_buffer, + )) + .is_err() + { + warn!("Constellation failed to send complete port transfer response."); + } + } else { + warn!("No message-port sender for {:?}", target_router_id); + } + + // Update the state to completion-in-progress. + MessagePortInfo { + state: TransferState::CompletionInProgress(target_router_id), + entangled_with: entry.entangled_with, + } + }, + _ => { + warn!("Unexpected port transfer failed message received"); + continue; + }, + }; + self.message_ports.insert(port_id, new_info); + } + } + + fn handle_complete_message_port_transfer( + &mut self, + router_id: MessagePortRouterId, + ports: Vec, + ) { + let mut response = HashMap::new(); + for port_id in ports.into_iter() { + let entry = match self.message_ports.remove(&port_id) { + None => { + warn!( + "Constellation asked to complete transfer for unknown messageport {:?}", + port_id + ); + continue; + }, + Some(entry) => entry, + }; + let new_info = match entry.state { + TransferState::EntangledRemoved => { + // If the entangled port has been removed while this one was in-transfer, + // remove it now. + if let Some(sender) = self.message_port_routers.get(&router_id) { + let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id)); + } else { + warn!("No message-port sender for {:?}", router_id); + } + continue; + }, + TransferState::TransferInProgress(buffer) => { + response.insert(port_id, buffer); + + // If the port was in transfer, and a global is requesting completion, + // we note the start of the completion. + MessagePortInfo { + state: TransferState::CompletionInProgress(router_id), + entangled_with: entry.entangled_with, + } + }, + TransferState::CompletionFailed(buffer) | + TransferState::CompletionRequested(_, buffer) => { + // If the completion had already failed, + // this is a request coming from a global to complete a new transfer, + // but we're still awaiting the return of the buffer + // from the first global who failed. + // + // So we note the request from the new global, + // and continue to buffer incoming messages + // and wait for the buffer used in the previous transfer to be returned. + // + // If another global requests completion in the CompletionRequested state, + // we simply swap the target router-id for the new one, + // keeping the buffer. + MessagePortInfo { + state: TransferState::CompletionRequested(router_id, buffer), + entangled_with: entry.entangled_with, + } + }, + _ => { + warn!("Unexpected complete port transfer message received"); + continue; + }, + }; + self.message_ports.insert(port_id, new_info); + } + + if !response.is_empty() { + // Forward the buffered message-queue. + if let Some(sender) = self.message_port_routers.get(&router_id) { + if sender + .send(MessagePortMsg::CompleteTransfer(response)) + .is_err() + { + warn!("Constellation failed to send complete port transfer response."); + } + } else { + warn!("No message-port sender for {:?}", router_id); + } + } + } + fn handle_reroute_messageport(&mut self, port_id: MessagePortId, task: PortMessageTask) { let info = match self.message_ports.get_mut(&port_id) { Some(info) => info, @@ -1790,7 +2027,10 @@ where }, }; match &mut info.state { - TransferState::Managed(router_id) => { + TransferState::Managed(router_id) | TransferState::CompletionInProgress(router_id) => { + // In both the managed and completion of a transfer case, we forward the message. + // Note that in both cases, if the port is transferred before the message is handled, + // it will be sent back here and buffered while the transfer is ongoing. if let Some(sender) = self.message_port_routers.get(&router_id) { let _ = sender.send(MessagePortMsg::NewTask(port_id, task)); } else { @@ -1798,6 +2038,8 @@ where } }, TransferState::TransferInProgress(queue) => queue.push_back(task), + TransferState::CompletionFailed(queue) => queue.push_back(task), + TransferState::CompletionRequested(_, queue) => queue.push_back(task), TransferState::EntangledRemoved => warn!( "Messageport received a message, but entangled has alread been removed {:?}", port_id @@ -1807,8 +2049,19 @@ where fn handle_messageport_shipped(&mut self, port_id: MessagePortId) { if let Some(info) = self.message_ports.get_mut(&port_id) { - if let TransferState::Managed(_) = info.state { - info.state = TransferState::TransferInProgress(VecDeque::new()); + match info.state { + TransferState::Managed(_) => { + // If shipped while managed, note the start of a transfer. + info.state = TransferState::TransferInProgress(VecDeque::new()); + }, + TransferState::CompletionInProgress(_) => { + // If shipped while completion of a transfer was in progress, + // the completion failed. + // This will be followed by a MessagePortTransferFailed message, + // containing the buffer we previously sent. + info.state = TransferState::CompletionFailed(VecDeque::new()); + }, + _ => warn!("Unexpected messageport shipped received"), } } else { warn!( @@ -1832,37 +2085,11 @@ where fn handle_new_messageport(&mut self, router_id: MessagePortRouterId, port_id: MessagePortId) { match self.message_ports.entry(port_id) { - // If we know about this port, it means it was transferred. - Entry::Occupied(mut entry) => { - if let TransferState::EntangledRemoved = entry.get().state { - // If the entangled port has been removed while this one was in-transfer, - // remove it now. - if let Some(sender) = self.message_port_routers.get(&router_id) { - let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id)); - } else { - warn!("No message-port sender for {:?}", router_id); - } - entry.remove_entry(); - return; - } - let new_info = MessagePortInfo { - state: TransferState::Managed(router_id), - entangled_with: entry.get().entangled_with.clone(), - }; - let old_info = entry.insert(new_info); - let buffer = match old_info.state { - TransferState::TransferInProgress(buffer) => buffer, - _ => { - return warn!("Completing transfer of a port that did not have a transfer in progress."); - }, - }; - // Forward the buffered message-queue. - if let Some(sender) = self.message_port_routers.get(&router_id) { - let _ = sender.send(MessagePortMsg::CompleteTransfer(port_id.clone(), buffer)); - } else { - warn!("No message-port sender for {:?}", router_id); - } - }, + // If it's a new port, we should not know about it. + Entry::Occupied(_) => warn!( + "Constellation asked to start tracking an existing messageport {:?}", + port_id + ), Entry::Vacant(entry) => { let info = MessagePortInfo { state: TransferState::Managed(router_id), @@ -1901,7 +2128,10 @@ where "Constellation asked to remove entangled messageport by a port that was already removed {:?}", port_id ), - TransferState::TransferInProgress(_) => { + TransferState::TransferInProgress(_) | + TransferState::CompletionInProgress(_) | + TransferState::CompletionFailed(_) | + TransferState::CompletionRequested(_, _) => { // Note: since the port is in-transer, we don't have a router to send it a message // to let it know that its entangled port has been removed. // Hence we mark it so that it will be messaged and removed once the transfer completes. diff --git a/components/script/dom/globalscope.rs b/components/script/dom/globalscope.rs index 7d07e67f9ec5..1933d1e926e2 100644 --- a/components/script/dom/globalscope.rs +++ b/components/script/dom/globalscope.rs @@ -218,12 +218,48 @@ impl MessageListener { /// and we can only access the root from the event-loop. fn notify(&self, msg: MessagePortMsg) { match msg { - MessagePortMsg::CompleteTransfer(port_id, tasks) => { + MessagePortMsg::CompleteTransfer(ports) => { let context = self.context.clone(); let _ = self.task_source.queue_with_canceller( task!(process_complete_transfer: move || { let global = context.root(); - global.complete_port_transfer(port_id, tasks); + + let router_id = match global.port_router_id() { + Some(router_id) => router_id, + None => { + // If not managing any ports, no transfer can succeed, + // so just send back everything. + let _ = global.script_to_constellation_chan().send( + ScriptMsg::MessagePortTransferResult(None, vec![], ports), + ); + return; + } + }; + + let mut succeeded = vec![]; + let mut failed = HashMap::new(); + + for (id, buffer) in ports.into_iter() { + if global.is_managing_port(&id) { + succeeded.push(id.clone()); + global.complete_port_transfer(id, buffer); + } else { + failed.insert(id, buffer); + } + } + let _ = global.script_to_constellation_chan().send( + ScriptMsg::MessagePortTransferResult(Some(router_id), succeeded, failed), + ); + }), + &self.canceller, + ); + }, + MessagePortMsg::CompletePendingTransfer(port_id, buffer) => { + let context = self.context.clone(); + let _ = self.task_source.queue_with_canceller( + task!(complete_pending: move || { + let global = context.root(); + global.complete_port_transfer(port_id, buffer); }), &self.canceller, ); @@ -294,6 +330,25 @@ impl GlobalScope { } } + /// The message-port router Id of the global, if any + fn port_router_id(&self) -> Option { + if let MessagePortState::Managed(id, _message_ports) = &*self.message_port_state.borrow() { + Some(id.clone()) + } else { + None + } + } + + /// Is this global managing a given port? + fn is_managing_port(&self, port_id: &MessagePortId) -> bool { + if let MessagePortState::Managed(_router_id, message_ports) = + &*self.message_port_state.borrow() + { + return message_ports.contains_key(port_id); + } + false + } + /// Complete the transfer of a message-port. fn complete_port_transfer(&self, port_id: MessagePortId, tasks: VecDeque) { let should_start = if let MessagePortState::Managed(_id, message_ports) = @@ -301,7 +356,7 @@ impl GlobalScope { { match message_ports.get_mut(&port_id) { None => { - panic!("CompleteTransfer msg received in a global not managing the port."); + panic!("complete_port_transfer called for an unknown port."); }, Some(ManagedMessagePort::Pending(_, _)) => { panic!("CompleteTransfer msg received for a pending port."); @@ -312,7 +367,7 @@ impl GlobalScope { }, } } else { - return warn!("CompleteTransfer msg received in a global not managing any ports."); + panic!("complete_port_transfer called for an unknown port."); }; if should_start { self.start_message_port(&port_id); @@ -554,22 +609,25 @@ impl GlobalScope { _ => None, }) .collect(); - for id in to_be_added { + for id in to_be_added.iter() { let (id, port_info) = message_ports .remove_entry(&id) .expect("Collected port-id to match an entry"); - if let ManagedMessagePort::Pending(port_impl, dom_port) = port_info { - let _ = self - .script_to_constellation_chan() - .send(ScriptMsg::NewMessagePort( - router_id.clone(), - port_impl.message_port_id().clone(), - )); - let new_port_info = ManagedMessagePort::Added(port_impl, dom_port); - let present = message_ports.insert(id, new_port_info); - assert!(present.is_none()); + match port_info { + ManagedMessagePort::Pending(port_impl, dom_port) => { + let new_port_info = ManagedMessagePort::Added(port_impl, dom_port); + let present = message_ports.insert(id, new_port_info); + assert!(present.is_none()); + }, + _ => panic!("Only pending ports should be found in to_be_added"), } } + let _ = + self.script_to_constellation_chan() + .send(ScriptMsg::CompleteMessagePortTransfer( + router_id.clone(), + to_be_added, + )); } else { warn!("maybe_add_pending_ports called on a global not managing any ports."); } diff --git a/components/script_traits/lib.rs b/components/script_traits/lib.rs index 9dd0ba04134d..9987713474de 100644 --- a/components/script_traits/lib.rs +++ b/components/script_traits/lib.rs @@ -1036,8 +1036,12 @@ pub struct PortMessageTask { /// Messages for communication between the constellation and a global managing ports. #[derive(Debug, Deserialize, Serialize)] pub enum MessagePortMsg { - /// Enables a port to catch-up on messages that were sent while the transfer was ongoing. - CompleteTransfer(MessagePortId, VecDeque), + /// Complete the transfer for a batch of ports. + CompleteTransfer(HashMap>), + /// Complete the transfer of a single port, + /// whose transfer was pending because it had been requested + /// while a previous failed transfer was being rolled-back. + CompletePendingTransfer(MessagePortId, VecDeque), /// Remove a port, the entangled one doesn't exists anymore. RemoveMessagePort(MessagePortId), /// Handle a new port-message-task. diff --git a/components/script_traits/script_msg.rs b/components/script_traits/script_msg.rs index 3a4d16186849..6d22101edf68 100644 --- a/components/script_traits/script_msg.rs +++ b/components/script_traits/script_msg.rs @@ -30,6 +30,7 @@ use net_traits::storage_thread::StorageType; use net_traits::CoreResourceMsg; use servo_url::ImmutableOrigin; use servo_url::ServoUrl; +use std::collections::{HashMap, VecDeque}; use std::fmt; use style_traits::viewport::ViewportConstraints; use style_traits::CSSPixel; @@ -114,6 +115,17 @@ pub enum HistoryEntryReplacement { /// Messages from the script to the constellation. #[derive(Deserialize, Serialize)] pub enum ScriptMsg { + /// Request to complete the transfer of a set of ports to a router. + CompleteMessagePortTransfer(MessagePortRouterId, Vec), + /// The results of attempting to complete the transfer of a batch of ports. + MessagePortTransferResult( + /* The router whose transfer of ports succeeded, if any */ + Option, + /* The ids of ports transferred successfully */ + Vec, + /* The ids, and buffers, of ports whose transfer failed */ + HashMap>, + ), /// A new message-port was created or transferred, with corresponding control-sender. NewMessagePort(MessagePortRouterId, MessagePortId), /// A global has started managing message-ports @@ -248,6 +260,8 @@ impl fmt::Debug for ScriptMsg { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { use self::ScriptMsg::*; let variant = match *self { + CompleteMessagePortTransfer(..) => "CompleteMessagePortTransfer", + MessagePortTransferResult(..) => "MessagePortTransferResult", NewMessagePortRouter(..) => "NewMessagePortRouter", RemoveMessagePortRouter(..) => "RemoveMessagePortRouter", NewMessagePort(..) => "NewMessagePort", diff --git a/tests/wpt/mozilla/meta/MANIFEST.json b/tests/wpt/mozilla/meta/MANIFEST.json index d8d05a7b9d80..1ee9b21fdec6 100644 --- a/tests/wpt/mozilla/meta/MANIFEST.json +++ b/tests/wpt/mozilla/meta/MANIFEST.json @@ -11281,6 +11281,32 @@ {} ] ], + "mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.js": [ + [ + "mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.html", + { + "script_metadata": [ + [ + "script", + "/common/get-host-info.sub.js" + ] + ] + } + ] + ], + "mozilla/Channel_postMessage_with_second_transfer_in_timeout_with_delay.window.js": [ + [ + "mozilla/Channel_postMessage_with_second_transfer_in_timeout_with_delay.window.html", + { + "script_metadata": [ + [ + "script", + "/common/get-host-info.sub.js" + ] + ] + } + ] + ], "mozilla/DOMParser.html": [ [ "mozilla/DOMParser.html", @@ -18409,6 +18435,14 @@ "276791c4348ada7e1da71041f2ccd383305e209c", "support" ], + "mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.js": [ + "4ee3f64beb095963f06fc53c1d53dad2244109f9", + "testharness" + ], + "mozilla/Channel_postMessage_with_second_transfer_in_timeout_with_delay.window.js": [ + "939995678895c07047709f6e265d0f6b7b705eb5", + "testharness" + ], "mozilla/DOMParser.html": [ "f386a3e0191af2c70dcb05790ce7db15dd5ccbf1", "testharness" diff --git a/tests/wpt/mozilla/tests/mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.js b/tests/wpt/mozilla/tests/mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.js new file mode 100644 index 000000000000..4ee3f64beb09 --- /dev/null +++ b/tests/wpt/mozilla/tests/mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.js @@ -0,0 +1,33 @@ +// META: script=/common/get-host-info.sub.js + +async_test(function(t) { + var channel1 = new MessageChannel(); + var channel2 = new MessageChannel(); + var host = get_host_info(); + let iframe = document.createElement('iframe'); + iframe.src = host.HTTP_NOTSAMESITE_ORIGIN + "/webmessaging/support/ChildWindowPostMessage.htm"; + document.body.appendChild(iframe); + var TARGET = document.querySelector("iframe").contentWindow; + iframe.onload = t.step_func(function() { + + // Send a message, expecting it to be received in the iframe. + channel1.port2.postMessage(1) + + // First, transfer the port into the same realm. + channel2.port2.postMessage(0, [channel1.port1]); + + channel2.port1.onmessage = t.step_func(function (evt) { + assert_equals(Number(evt.data), 0); + + t.step_timeout(function () { + // Transfer the port to the iframe. + TARGET.postMessage("ports", "*", evt.ports); + }, 0); + }); + + channel1.port2.onmessage = t.step_func(function (evt) { + assert_equals(Number(evt.data), 1); + t.done(); + }); + }); +}, `A port transferred outside of a onmessage handler does not lose messages along the way.`); diff --git a/tests/wpt/mozilla/tests/mozilla/Channel_postMessage_with_second_transfer_in_timeout_with_delay.window.js b/tests/wpt/mozilla/tests/mozilla/Channel_postMessage_with_second_transfer_in_timeout_with_delay.window.js new file mode 100644 index 000000000000..939995678895 --- /dev/null +++ b/tests/wpt/mozilla/tests/mozilla/Channel_postMessage_with_second_transfer_in_timeout_with_delay.window.js @@ -0,0 +1,43 @@ +// META: script=/common/get-host-info.sub.js + +async_test(function(t) { + var channel1 = new MessageChannel(); + var channel2 = new MessageChannel(); + var host = get_host_info(); + let iframe = document.createElement('iframe'); + iframe.src = host.HTTP_NOTSAMESITE_ORIGIN + "/webmessaging/support/ChildWindowPostMessage.htm"; + document.body.appendChild(iframe); + var TARGET = document.querySelector("iframe").contentWindow; + iframe.onload = t.step_func(function() { + + // Send a message, expecting it to be received in the iframe. + channel1.port2.postMessage(1) + + // First, transfer the port into the same realm. + channel2.port2.postMessage(0, [channel1.port1]); + + channel2.port1.onmessage = t.step_func(function (evt) { + assert_equals(Number(evt.data), 0); + + t.step_timeout(function () { + // Transfer the port to the iframe. + TARGET.postMessage("ports", "*", evt.ports); + + // Keep the event-loop busy for one second, + // which will result in the iframe + // starting the "complete port transfer" flow, + // before the window global could finish it's own. + var request = new XMLHttpRequest(); + request.open('GET', 'blank.html?pipe=trickle(d1)', false); + request.send(null); + }, 0); + }); + + channel1.port2.onmessage = t.step_func(function (evt) { + assert_equals(Number(evt.data), 1); + t.done(); + }); + }); +}, `A port transferred outside of a onmessage handler, + followed by a delay in returning the buffer caused by blocking the event-loop, + does not lose messages along the way.`);