Skip to content

Commit

Permalink
Fix, expose, test epoch and stratum counters
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Mar 4, 2022
1 parent fb02f26 commit 03efd52
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
7 changes: 7 additions & 0 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@ pub struct Context<'a> {
pub(crate) states: &'a mut [StateData],
pub(crate) event_queue_send: &'a UnboundedSender<SubgraphId>,
pub(crate) current_epoch: usize,
pub(crate) current_stratum: usize,
}
impl<'a> Context<'a> {
// Gets the current epoch (local time) count.
pub fn current_epoch(&self) -> usize {
self.current_epoch
}

// Gets the current stratum nubmer.
pub fn current_stratum(&self) -> usize {
self.current_stratum
}

pub fn waker(&self) -> std::task::Waker {
use futures::task::ArcWake;
use std::sync::Arc;
Expand Down
25 changes: 19 additions & 6 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ impl Hydroflow {
Reactor::new(self.event_queue_send.clone())
}

// Gets the current epoch (local time) count.
pub fn current_epoch(&self) -> usize {
self.current_epoch
}

// Gets the current stratum nubmer.
pub fn current_stratum(&self) -> usize {
self.current_stratum
}

/// Runs the dataflow until no more work is immediately available.
pub fn tick(&mut self) {
while {
Expand All @@ -87,6 +97,7 @@ impl Hydroflow {
states: &mut self.states,
event_queue_send: &self.event_queue_send,
current_epoch: self.current_epoch,
current_stratum: self.current_stratum,
};
sg_data.subgraph.run(context);
}
Expand Down Expand Up @@ -114,14 +125,16 @@ impl Hydroflow {
pub fn next_stratum(&mut self) -> bool {
self.try_recv_events();

let mut next_stratum = self.current_stratum;
let old_stratum = self.current_stratum;
while {
next_stratum += 1;
next_stratum %= self.ready_queue.len();
next_stratum != self.current_stratum
self.current_stratum += 1;
if self.current_stratum >= self.ready_queue.len() {
self.current_stratum = 0;
self.current_epoch += 1;
}
old_stratum != self.current_stratum
} {
if !self.ready_queue[next_stratum].is_empty() {
self.current_stratum = next_stratum;
if !self.ready_queue[self.current_stratum].is_empty() {
return true;
}
}
Expand Down
10 changes: 10 additions & 0 deletions hydroflow/tests/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,29 @@ fn groupby_nonmon_surface() {
input.give(Iter(BATCH_A.iter().cloned()));
input.flush();
hf.tick_stratum();
assert_eq!((0, 0), (hf.current_epoch(), hf.current_stratum()));

assert_eq!(None, output.get());

hf.tick();
assert_eq!((1, 1), (hf.current_epoch(), hf.current_stratum()));

assert_eq!(Some((BATCH_A.len(), "justin")), output.get());

// Give BATCH_B but only run this stratum.
input.give(Iter(BATCH_B.iter().cloned()));
input.flush();

hf.tick_stratum();
assert_eq!((1, 1), (hf.current_epoch(), hf.current_stratum()));

// Give BATCH_C and run all to completion.
input.give(Iter(BATCH_C.iter().cloned()));
input.flush();

hf.tick();
assert_eq!((3, 1), (hf.current_epoch(), hf.current_stratum()));

// Second batch has 7+3 = 10 items.
assert_eq!(Some((BATCH_B.len() + BATCH_C.len(), "mae")), output.get());
assert_eq!(false, hf.next_stratum());
Expand Down

0 comments on commit 03efd52

Please sign in to comment.