Skip to content

Commit

Permalink
Rename scheduler ready_queue to stratum_queues
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Mar 4, 2022
1 parent 0b759d7 commit ee43144
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Hydroflow {

// TODO(mingwei): separate scheduler into its own struct/trait?
// Index is stratum, value is FIFO queue for that stratum.
ready_queue: Vec<VecDeque<SubgraphId>>,
stratum_queues: Vec<VecDeque<SubgraphId>>,
current_stratum: usize,
current_epoch: usize,

Expand All @@ -35,14 +35,14 @@ pub struct Hydroflow {
}
impl Default for Hydroflow {
fn default() -> Self {
let (subgraphs, handoffs, states, ready_queue) = Default::default();
let (subgraphs, handoffs, states, stratum_queues) = Default::default();
let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel();
Self {
subgraphs,
handoffs,
states,

ready_queue,
stratum_queues,
current_stratum: 0,
current_epoch: 0,

Expand Down Expand Up @@ -84,7 +84,7 @@ impl Hydroflow {
// Add any external jobs to ready queue.
self.try_recv_events();

while let Some(sg_id) = self.ready_queue[self.current_stratum].pop_front() {
while let Some(sg_id) = self.stratum_queues[self.current_stratum].pop_front() {
{
let sg_data = &mut self.subgraphs[sg_id];
// This must be true for the subgraph to be enqueued.
Expand All @@ -111,7 +111,7 @@ impl Hydroflow {
continue;
}
succ_sg_data.is_scheduled.set(true);
self.ready_queue[succ_sg_data.stratum].push_back(succ_id);
self.stratum_queues[succ_sg_data.stratum].push_back(succ_id);
}
}
}
Expand All @@ -128,12 +128,12 @@ impl Hydroflow {
let old_stratum = self.current_stratum;
loop {
// If current stratum has work, return true.
if !self.ready_queue[self.current_stratum].is_empty() {
if !self.stratum_queues[self.current_stratum].is_empty() {
return true;
}
// Increment stratum counter.
self.current_stratum += 1;
if self.current_stratum >= self.ready_queue.len() {
if self.current_stratum >= self.stratum_queues.len() {
self.current_stratum = 0;
self.current_epoch += 1;
}
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Hydroflow {
while let Ok(sg_id) = self.event_queue_recv.try_recv() {
let sg_data = &self.subgraphs[sg_id];
if !sg_data.is_scheduled.replace(true) {
self.ready_queue[sg_data.stratum].push_back(sg_id);
self.stratum_queues[sg_data.stratum].push_back(sg_id);
enqueued_count += 1;
}
}
Expand All @@ -189,7 +189,7 @@ impl Hydroflow {
let sg_id = self.event_queue_recv.blocking_recv()?;
let sg_data = &self.subgraphs[sg_id];
if !sg_data.is_scheduled.replace(true) {
self.ready_queue[sg_data.stratum].push_back(sg_id);
self.stratum_queues[sg_data.stratum].push_back(sg_id);

// Enqueue any other immediate events.
return Some(NonZeroUsize::new(self.try_recv_events() + 1).unwrap());
Expand All @@ -204,7 +204,7 @@ impl Hydroflow {
let sg_id = self.event_queue_recv.recv().await?;
let sg_data = &self.subgraphs[sg_id];
if !sg_data.is_scheduled.replace(true) {
self.ready_queue[sg_data.stratum].push_back(sg_id);
self.stratum_queues[sg_data.stratum].push_back(sg_id);

// Enqueue any other immediate events.
return Some(NonZeroUsize::new(self.try_recv_events() + 1).unwrap());
Expand Down Expand Up @@ -265,7 +265,7 @@ impl Hydroflow {
true,
));
self.init_stratum(stratum);
self.ready_queue[stratum].push_back(sg_id);
self.stratum_queues[stratum].push_back(sg_id);

sg_id
}
Expand Down Expand Up @@ -354,15 +354,16 @@ impl Hydroflow {
true,
));
self.init_stratum(stratum);
self.ready_queue[stratum].push_back(sg_id);
self.stratum_queues[stratum].push_back(sg_id);

sg_id
}

/// Makes sure stratum STRATUM is initialized.
fn init_stratum(&mut self, stratum: usize) {
if self.ready_queue.len() <= stratum {
self.ready_queue.resize_with(stratum + 1, Default::default);
if self.stratum_queues.len() <= stratum {
self.stratum_queues
.resize_with(stratum + 1, Default::default);
}
}

Expand Down Expand Up @@ -457,7 +458,7 @@ struct SubgraphData {
#[allow(dead_code)]
preds: Vec<HandoffId>,
succs: Vec<HandoffId>,
/// If this subgraph is scheduled in [`Hydroflow::ready_queue`].
/// If this subgraph is scheduled in [`Hydroflow::stratum_queues`].
/// [`Cell`] allows modifying this field when iterating `Self::preds` or
/// `Self::succs`, as all `SubgraphData` are owned by the same vec
/// `Hydroflow::subgraphs`.
Expand Down

0 comments on commit ee43144

Please sign in to comment.