Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement strata, enabling per-tick non-monotonic aggregations #94

Merged
merged 6 commits into from
Mar 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion hydroflow/src/builder/hydroflow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,31 @@ impl HydroflowBuilder {
name: Name,
pivot: PivotSurface<Pull, Push>,
) -> SubgraphId
where
Name: Into<Cow<'static, str>>,
Pull: 'static + PullSurface,
Push: 'static + PushSurfaceReversed<ItemIn = Pull::ItemOut>,
{
self.add_subgraph_stratified(name, 0, pivot)
}

/// Adds a `pivot` created via the Surface API, with the given stratum number.
pub fn add_subgraph_stratified<Name, Pull, Push>(
&mut self,
name: Name,
stratum: usize,
pivot: PivotSurface<Pull, Push>,
) -> SubgraphId
where
Name: Into<Cow<'static, str>>,
Pull: 'static + PullSurface,
Push: 'static + PushSurfaceReversed<ItemIn = Pull::ItemOut>,
{
let ((recv_ports, send_ports), (mut pull_build, mut push_build)) = pivot.into_parts();

self.hydroflow.add_subgraph(
self.hydroflow.add_subgraph_stratified(
name,
stratum,
recv_ports,
send_ports,
move |context, recv_ctx, send_ctx| {
Expand Down
12 changes: 12 additions & 0 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,20 @@ pub struct Context<'a> {
pub(crate) handoffs: &'a mut [HandoffData],
pub(crate) states: &'a mut [StateData],
pub(crate) event_queue_send: &'a UnboundedSender<SubgraphId>,
pub(crate) current_epoch: usize,
pub(crate) current_stratum: usize,
}
impl<'a> Context<'a> {
// Gets the current epoch (local time) count.
pub fn current_epoch(&self) -> usize {
self.current_epoch
}

// Gets the current stratum nubmer.
pub fn current_stratum(&self) -> usize {
self.current_stratum
}

pub fn waker(&self) -> std::task::Waker {
use futures::task::ArcWake;
use std::sync::Arc;
Expand Down
142 changes: 125 additions & 17 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@ pub struct Hydroflow {
states: Vec<StateData>,

// TODO(mingwei): separate scheduler into its own struct/trait?
ready_queue: VecDeque<SubgraphId>,
// Index is stratum, value is FIFO queue for that stratum.
stratum_queues: Vec<VecDeque<SubgraphId>>,
current_stratum: usize,
current_epoch: usize,

event_queue_send: UnboundedSender<SubgraphId>, // TODO(mingwei) remove this, to prevent hanging.
event_queue_recv: UnboundedReceiver<SubgraphId>,
}
impl Default for Hydroflow {
fn default() -> Self {
let (subgraphs, handoffs, states, ready_queue) = Default::default();
let (subgraphs, handoffs, states, stratum_queues) = Default::default();
let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel();
Self {
subgraphs,
handoffs,
states,
ready_queue,

stratum_queues,
current_stratum: 0,
current_epoch: 0,

event_queue_send,
event_queue_recv,
}
Expand All @@ -54,12 +62,29 @@ impl Hydroflow {
Reactor::new(self.event_queue_send.clone())
}

/// Runs the dataflow until no more work is currently available.
// Gets the current epoch (local time) count.
pub fn current_epoch(&self) -> usize {
self.current_epoch
}

// Gets the current stratum nubmer.
pub fn current_stratum(&self) -> usize {
self.current_stratum
}

/// Runs the dataflow until no more work is immediately available.
pub fn tick(&mut self) {
while self.next_stratum() {
self.tick_stratum();
}
}

/// Runs the current stratum of the dataflow until no more work is immediately available.
pub fn tick_stratum(&mut self) {
// Add any external jobs to ready queue.
self.try_recv_events();

while let Some(sg_id) = self.ready_queue.pop_front() {
while let Some(sg_id) = self.stratum_queues[self.current_stratum].pop_front() {
{
let sg_data = &mut self.subgraphs[sg_id];
// This must be true for the subgraph to be enqueued.
Expand All @@ -70,6 +95,8 @@ impl Hydroflow {
handoffs: &mut self.handoffs,
states: &mut self.states,
event_queue_send: &self.event_queue_send,
current_epoch: self.current_epoch,
current_stratum: self.current_stratum,
};
sg_data.subgraph.run(context);
}
Expand All @@ -84,7 +111,7 @@ impl Hydroflow {
continue;
}
succ_sg_data.is_scheduled.set(true);
self.ready_queue.push_back(succ_id);
self.stratum_queues[succ_sg_data.stratum].push_back(succ_id);
}
}
}
Expand All @@ -93,6 +120,32 @@ impl Hydroflow {
}
}

/// Go to the next stratum which has work available, possibly the current stratum.
/// Return true if more work is available, otherwise false if no work is immediately available on any strata.
pub fn next_stratum(&mut self) -> bool {
self.try_recv_events();

let old_stratum = self.current_stratum;
loop {
// If current stratum has work, return true.
if !self.stratum_queues[self.current_stratum].is_empty() {
return true;
}
// Increment stratum counter.
self.current_stratum += 1;
if self.current_stratum >= self.stratum_queues.len() {
self.current_stratum = 0;
self.current_epoch += 1;
}
// After incrementing, exit if we made a full loop around the strata.
if old_stratum == self.current_stratum {
// Note: if current stratum had work, the very first loop iteration would've
// returned true. Therefore we can return false without checking.
return false;
}
}
}

/// Run the dataflow graph to completion.
///
/// TODO(mingwei): Currently blockes forever, no notion of "completion."
Expand Down Expand Up @@ -120,8 +173,9 @@ impl Hydroflow {
pub fn try_recv_events(&mut self) -> usize {
let mut enqueued_count = 0;
while let Ok(sg_id) = self.event_queue_recv.try_recv() {
if !self.subgraphs[sg_id].is_scheduled.replace(true) {
self.ready_queue.push_back(sg_id);
let sg_data = &self.subgraphs[sg_id];
if !sg_data.is_scheduled.replace(true) {
self.stratum_queues[sg_data.stratum].push_back(sg_id);
enqueued_count += 1;
}
}
Expand All @@ -133,8 +187,9 @@ impl Hydroflow {
pub fn recv_events(&mut self) -> Option<NonZeroUsize> {
loop {
let sg_id = self.event_queue_recv.blocking_recv()?;
if !self.subgraphs[sg_id].is_scheduled.replace(true) {
self.ready_queue.push_back(sg_id);
let sg_data = &self.subgraphs[sg_id];
if !sg_data.is_scheduled.replace(true) {
self.stratum_queues[sg_data.stratum].push_back(sg_id);

// Enqueue any other immediate events.
return Some(NonZeroUsize::new(self.try_recv_events() + 1).unwrap());
Expand All @@ -147,21 +202,39 @@ impl Hydroflow {
pub async fn recv_events_async(&mut self) -> Option<NonZeroUsize> {
loop {
let sg_id = self.event_queue_recv.recv().await?;
if !self.subgraphs[sg_id].is_scheduled.replace(true) {
self.ready_queue.push_back(sg_id);
let sg_data = &self.subgraphs[sg_id];
if !sg_data.is_scheduled.replace(true) {
self.stratum_queues[sg_data.stratum].push_back(sg_id);

// Enqueue any other immediate events.
return Some(NonZeroUsize::new(self.try_recv_events() + 1).unwrap());
}
}
}

pub fn add_subgraph<Name, R, W, F>(
MingweiSamuel marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
name: Name,
recv_ports: R,
send_ports: W,
subgraph: F,
) -> SubgraphId
where
Name: Into<Cow<'static, str>>,
R: 'static + PortList<RECV>,
W: 'static + PortList<SEND>,
F: 'static + FnMut(&Context<'_>, R::Ctx<'_>, W::Ctx<'_>),
{
self.add_subgraph_stratified(name, 0, recv_ports, send_ports, subgraph)
}

/// Adds a new compiled subgraph with the specified inputs and outputs.
///
/// TODO(mingwei): add example in doc.
pub fn add_subgraph<Name, R, W, F>(
pub fn add_subgraph_stratified<Name, R, W, F>(
&mut self,
name: Name,
stratum: usize,
recv_ports: R,
send_ports: W,
mut subgraph: F,
Expand All @@ -185,22 +258,42 @@ impl Hydroflow {
};
self.subgraphs.push(SubgraphData::new(
name.into(),
stratum,
subgraph,
subgraph_preds,
subgraph_succs,
true,
));
self.ready_queue.push_back(sg_id);
self.init_stratum(stratum);
self.stratum_queues[stratum].push_back(sg_id);

sg_id
}

/// Adds a new compiled subraph with a variable number of inputs and outputs of the same respective handoff types.
/// Adds a new compiled subgraph with a variable number of inputs and outputs of the same respective handoff types.
pub fn add_subgraph_n_m<Name, R, W, F>(
&mut self,
name: Name,
recv_ports: Vec<RecvPort<R>>,
send_ports: Vec<SendPort<W>>,
subgraph: F,
) -> SubgraphId
where
Name: Into<Cow<'static, str>>,
R: 'static + Handoff,
W: 'static + Handoff,
F: 'static + FnMut(&Context<'_>, &[&RecvCtx<R>], &[&SendCtx<W>]),
{
self.add_subgraph_stratified_n_m(name, 0, recv_ports, send_ports, subgraph)
}

/// Adds a new compiled subgraph with a variable number of inputs and outputs of the same respective handoff types.
pub fn add_subgraph_stratified_n_m<Name, R, W, F>(
&mut self,
name: Name,
stratum: usize,
recv_ports: Vec<RecvPort<R>>,
send_ports: Vec<SendPort<W>>,
mut subgraph: F,
) -> SubgraphId
where
Expand Down Expand Up @@ -254,16 +347,26 @@ impl Hydroflow {
};
self.subgraphs.push(SubgraphData::new(
name.into(),
stratum,
subgraph,
subgraph_preds,
subgraph_succs,
true,
));
self.ready_queue.push_back(sg_id);
self.init_stratum(stratum);
self.stratum_queues[stratum].push_back(sg_id);

sg_id
}

/// Makes sure stratum STRATUM is initialized.
MingweiSamuel marked this conversation as resolved.
Show resolved Hide resolved
fn init_stratum(&mut self, stratum: usize) {
if self.stratum_queues.len() <= stratum {
self.stratum_queues
.resize_with(stratum + 1, Default::default);
}
}

/// Creates a handoff edge and returns the corresponding send and receive ports.
pub fn make_edge<Name, H>(&mut self, name: Name) -> (SendPort<H>, RecvPort<H>)
where
Expand Down Expand Up @@ -348,11 +451,14 @@ struct SubgraphData {
/// A friendly name for diagnostics.
#[allow(dead_code)] // TODO(mingwei): remove attr once used.
name: Cow<'static, str>,
/// This subgraph's stratum number.
stratum: usize,
/// The actual execution code of the subgraph.
subgraph: Box<dyn Subgraph>,
#[allow(dead_code)]
preds: Vec<HandoffId>,
succs: Vec<HandoffId>,
/// If this subgraph is scheduled in [`Hydroflow::ready_queue`].
/// If this subgraph is scheduled in [`Hydroflow::stratum_queues`].
/// [`Cell`] allows modifying this field when iterating `Self::preds` or
/// `Self::succs`, as all `SubgraphData` are owned by the same vec
/// `Hydroflow::subgraphs`.
Expand All @@ -361,13 +467,15 @@ struct SubgraphData {
impl SubgraphData {
pub fn new(
name: Cow<'static, str>,
stratum: usize,
subgraph: impl 'static + Subgraph,
preds: Vec<HandoffId>,
succs: Vec<HandoffId>,
is_scheduled: bool,
) -> Self {
Self {
name,
stratum,
subgraph: Box::new(subgraph),
preds,
succs,
Expand Down
Loading