Skip to content

Commit

Permalink
implement broadcastchannel
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Feb 25, 2020
1 parent 145c89a commit eb21d5f
Show file tree
Hide file tree
Showing 32 changed files with 763 additions and 216 deletions.
213 changes: 208 additions & 5 deletions components/constellation/constellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ use log::{Level, LevelFilter, Log, Metadata, Record};
use media::{GLPlayerThreads, WindowGLContext};
use msg::constellation_msg::{BackgroundHangMonitorRegister, HangMonitorAlert, SamplerControlMsg};
use msg::constellation_msg::{
BrowsingContextGroupId, BrowsingContextId, HistoryStateId, PipelineId,
TopLevelBrowsingContextId,
BroadcastChannelRouterId, MessagePortId, MessagePortRouterId, PipelineNamespace,
PipelineNamespaceId, PipelineNamespaceRequest, TraversalDirection,
};
use msg::constellation_msg::{
MessagePortId, MessagePortRouterId, PipelineNamespace, PipelineNamespaceId,
PipelineNamespaceRequest, TraversalDirection,
BrowsingContextGroupId, BrowsingContextId, HistoryStateId, PipelineId,
TopLevelBrowsingContextId,
};
use net_traits::pub_domains::reg_host;
use net_traits::request::RequestBuilder;
Expand All @@ -142,7 +142,8 @@ use profile_traits::time;
use script_traits::CompositorEvent::{MouseButtonEvent, MouseMoveEvent};
use script_traits::{webdriver_msg, LogEntry, ScriptToConstellationChan, ServiceWorkerMsg};
use script_traits::{
AnimationState, AnimationTickType, AuxiliaryBrowsingContextLoadInfo, CompositorEvent,
AnimationState, AnimationTickType, AuxiliaryBrowsingContextLoadInfo, BroadcastMsg,
CompositorEvent,
};
use script_traits::{ConstellationControlMsg, DiscardBrowsingContext};
use script_traits::{DocumentActivity, DocumentState, LayoutControlMsg, LoadData, LoadOrigin};
Expand Down Expand Up @@ -399,6 +400,12 @@ pub struct Constellation<Message, LTF, STF> {
/// A map of router-id to ipc-sender, to route messages to ports.
message_port_routers: HashMap<MessagePortRouterId, IpcSender<MessagePortMsg>>,

/// A map of broadcast routers to their IPC sender.
broadcast_routers: HashMap<BroadcastChannelRouterId, IpcSender<BroadcastMsg>>,

/// A map of origin to a map of channel-name to a list of relevant routers.
broadcast_channels: HashMap<ImmutableOrigin, HashMap<String, Vec<BroadcastChannelRouterId>>>,

/// The set of all the pipelines in the browser. (See the `pipeline` module
/// for more details.)
pipelines: HashMap<PipelineId, Pipeline>,
Expand Down Expand Up @@ -961,6 +968,8 @@ where
browsing_context_group_next_id: Default::default(),
message_ports: HashMap::new(),
message_port_routers: HashMap::new(),
broadcast_routers: HashMap::new(),
broadcast_channels: HashMap::new(),
pipelines: HashMap::new(),
browsing_contexts: HashMap::new(),
pending_changes: vec![],
Expand Down Expand Up @@ -1760,6 +1769,36 @@ where
FromScriptMsg::EntanglePorts(port1, port2) => {
self.handle_entangle_messageports(port1, port2);
},
FromScriptMsg::NewBroadcastChannelRouter(router_id, ipc_sender, origin) => {
self.handle_new_broadcast_channel_router(
source_pipeline_id,
router_id,
ipc_sender,
origin,
);
},
FromScriptMsg::NewBroadcastChannelNameInRouter(router_id, channel_name, origin) => {
self.handle_new_broadcast_channel_name_in_router(
source_pipeline_id,
router_id,
channel_name,
origin,
);
},
FromScriptMsg::RemoveBroadcastChannelNameInRouter(router_id, channel_name, origin) => {
self.handle_remove_broadcast_channel_name_in_router(
source_pipeline_id,
router_id,
channel_name,
origin,
);
},
FromScriptMsg::RemoveBroadcastChannelRouter(router_id, origin) => {
self.handle_remove_broadcast_channel_router(source_pipeline_id, router_id, origin);
},
FromScriptMsg::ScheduleBroadcast(router_id, message) => {
self.handle_schedule_broadcast(source_pipeline_id, router_id, message);
},
FromScriptMsg::ForwardToEmbedder(embedder_msg) => {
self.embedder_proxy
.send((Some(source_top_ctx_id), embedder_msg));
Expand Down Expand Up @@ -1976,6 +2015,170 @@ where
}
}

/// Check the origin of a message against that of the pipeline it came from.
/// Note: this is still limited as a security check,
/// see https://github.com/servo/servo/issues/11722
fn check_origin_against_pipeline(
&self,
pipeline_id: &PipelineId,
origin: &ImmutableOrigin,
) -> Result<(), ()> {
let pipeline_origin = match self.pipelines.get(&pipeline_id) {
Some(pipeline) => pipeline.load_data.url.origin(),
None => {
warn!("Received message from closed or unknown pipeline.");
return Err(());
},
};
if &pipeline_origin == origin {
return Ok(());
}
Err(())
}

/// Broadcast a message via routers in various event-loops.
fn handle_schedule_broadcast(
&self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
message: BroadcastMsg,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &message.origin)
.is_err()
{
return warn!(
"Attempt to schedule broadcast from an origin not matching the origin of the msg."
);
}
if let Some(channels) = self.broadcast_channels.get(&message.origin) {
let routers = match channels.get(&message.channel_name) {
Some(routers) => routers,
None => return warn!("Broadcast to channel name without active routers."),
};
for router in routers {
// Exclude the sender of the broadcast.
// Broadcasting locally is done at the point of sending.
if router == &router_id {
continue;
}

if let Some(sender) = self.broadcast_routers.get(&router) {
if sender.send(message.clone()).is_err() {
warn!("Failed to broadcast message to router: {:?}", router);
}
} else {
warn!("No sender for broadcast router: {:?}", router);
}
}
} else {
warn!(
"Attempt to schedule a broadcast for an origin without routers {:?}",
message.origin
);
}
}

/// Remove a channel-name for a given broadcast router.
fn handle_remove_broadcast_channel_name_in_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
channel_name: String,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to remove channel name from an unexpected origin.");
}
if let Some(channels) = self.broadcast_channels.get_mut(&origin) {
let is_empty = if let Some(routers) = channels.get_mut(&channel_name) {
routers.retain(|router| router != &router_id);
routers.is_empty()
} else {
return warn!(
"Multiple attemps to remove name for broadcast-channel {:?} at {:?}",
channel_name, origin
);
};
if is_empty {
channels.remove(&channel_name);
}
} else {
warn!(
"Attempt to remove a channel-name for an origin without channels {:?}",
origin
);
}
}

/// Note a new channel-name relevant to a given broadcast router.
fn handle_new_broadcast_channel_name_in_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
channel_name: String,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to add channel name from an unexpected origin.");
}
let channels = self
.broadcast_channels
.entry(origin)
.or_insert_with(HashMap::new);

let routers = channels.entry(channel_name).or_insert_with(Vec::new);

routers.push(router_id);
}

/// Remove a broadcast router.
fn handle_remove_broadcast_channel_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to remove broadcast router from an unexpected origin.");
}
if self.broadcast_routers.remove(&router_id).is_none() {
warn!("Attempt to remove unknown broadcast-channel router.");
}
}

/// Add a new broadcast router.
fn handle_new_broadcast_channel_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
ipc_sender: IpcSender<BroadcastMsg>,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to add broadcast router from an unexpected origin.");
}
if self
.broadcast_routers
.insert(router_id, ipc_sender)
.is_some()
{
warn!("Multple attempt to add broadcast-channel router.");
}
}

fn handle_request_wgpu_adapter(
&mut self,
source_pipeline_id: PipelineId,
Expand Down
43 changes: 43 additions & 0 deletions components/msg/constellation_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ impl PipelineNamespace {
}
}

fn next_broadcast_channel_router_id(&mut self) -> BroadcastChannelRouterId {
BroadcastChannelRouterId {
namespace_id: self.id,
index: BroadcastChannelRouterIndex(self.next_index()),
}
}

fn next_blob_id(&mut self) -> BlobId {
BlobId {
namespace_id: self.id,
Expand Down Expand Up @@ -380,6 +387,42 @@ impl fmt::Display for MessagePortRouterId {
}
}

#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub struct BroadcastChannelRouterIndex(pub NonZeroU32);
malloc_size_of_is_0!(BroadcastChannelRouterIndex);

#[derive(
Clone, Copy, Debug, Deserialize, Eq, Hash, MallocSizeOf, Ord, PartialEq, PartialOrd, Serialize,
)]
pub struct BroadcastChannelRouterId {
pub namespace_id: PipelineNamespaceId,
pub index: BroadcastChannelRouterIndex,
}

impl BroadcastChannelRouterId {
pub fn new() -> BroadcastChannelRouterId {
PIPELINE_NAMESPACE.with(|tls| {
let mut namespace = tls.get().expect("No namespace set for this thread!");
let next_broadcast_channel_router_id = namespace.next_broadcast_channel_router_id();
tls.set(Some(namespace));
next_broadcast_channel_router_id
})
}
}

impl fmt::Display for BroadcastChannelRouterId {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let PipelineNamespaceId(namespace_id) = self.namespace_id;
let BroadcastChannelRouterIndex(index) = self.index;
write!(
fmt,
"(BroadcastChannelRouterId{},{})",
namespace_id,
index.get()
)
}
}

#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub struct BlobIndex(pub NonZeroU32);
malloc_size_of_is_0!(BlobIndex);
Expand Down
6 changes: 4 additions & 2 deletions components/script/dom/bindings/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ use media::WindowGLContext;
use metrics::{InteractiveMetrics, InteractiveWindow};
use mime::Mime;
use msg::constellation_msg::{
BlobId, BrowsingContextId, HistoryStateId, MessagePortId, MessagePortRouterId, PipelineId,
TopLevelBrowsingContextId,
BlobId, BroadcastChannelRouterId, BrowsingContextId, HistoryStateId, MessagePortId,
MessagePortRouterId, PipelineId, TopLevelBrowsingContextId,
};
use net_traits::filemanager_thread::RelativePos;
use net_traits::image::base::{Image, ImageMetadata};
Expand Down Expand Up @@ -175,6 +175,8 @@ unsafe_no_jsmanaged_fields!(MessagePortId);
unsafe_no_jsmanaged_fields!(RefCell<Option<MessagePortId>>);
unsafe_no_jsmanaged_fields!(MessagePortRouterId);

unsafe_no_jsmanaged_fields!(BroadcastChannelRouterId);

unsafe_no_jsmanaged_fields!(BlobId);
unsafe_no_jsmanaged_fields!(BlobImpl);

Expand Down
Loading

0 comments on commit eb21d5f

Please sign in to comment.