Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow): Add deadline Instant args to hydroflow run methods #957

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,20 @@ impl Hydroflow {

/// Runs the dataflow until the next tick begins.
/// Returns true if any work was done.
#[tracing::instrument(level = "trace", skip(self), ret)]
pub fn run_tick(&mut self) -> bool {
self.run_tick_deadline(None)
}

/// Runs the dataflow until the next tick begins, or the deadline is reached.
/// Returns true if any work was done.
#[tracing::instrument(level = "trace", skip(self), ret)]
pub fn run_tick_deadline(&mut self, deadline: Option<Instant>) -> bool {
let mut work_done = false;
// While work is immediately available *on the current tick*.
while self.next_stratum(true) {
work_done = true;
// Do any work.
self.run_stratum();
self.run_stratum(deadline);
}
work_done
}
Expand All @@ -154,31 +160,49 @@ impl Hydroflow {
/// Runs at least one tick of dataflow, even if no external events have been received.
/// If the dataflow contains loops this method may run forever.
/// Returns true if any work was done.
#[tracing::instrument(level = "trace", skip(self), ret)]
pub fn run_available(&mut self) -> bool {
self.run_available_deadline(None)
}

/// Runs the dataflow until no more (externally-triggered) work is immediately available or the
/// deadline is reached.
/// Runs at least one tick of dataflow, even if no external events have been received.
/// If the dataflow contains loops this method may run forever.
/// Returns true if any work was done.
#[tracing::instrument(level = "trace", skip(self), ret)]
pub fn run_available_deadline(&mut self, deadline: Option<Instant>) -> bool {
let mut work_done = false;
// While work is immediately available.
while self.next_stratum(false) {
work_done = true;
// Do any work.
self.run_stratum();
self.run_stratum(deadline);
}
work_done
}

/// Runs the dataflow until no more (externally-triggered) work is immediately available.
/// Runs at least one tick of dataflow, even if no external events have been received.
/// If the dataflow contains loops this method may run forever.
/// Returns true if any work was done.
/// Yields repeatedly to allow external events to happen.
pub fn run_available_async(&mut self) -> impl '_ + Future<Output = bool> {
self.run_available_deadline_async(None)
}

/// Runs the dataflow until no more (externally-triggered) work is immediately available.
/// Runs at least one tick of dataflow, even if no external events have been received.
/// If the dataflow contains loops this method may run forever.
/// Returns true if any work was done.
/// Yields repeatedly to allow external events to happen.
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn run_available_async(&mut self) -> bool {
pub async fn run_available_deadline_async(&mut self, deadline: Option<Instant>) -> bool {
let mut work_done = false;
// While work is immediately available.
while self.next_stratum(false) {
work_done = true;
// Do any work.
self.run_stratum();
self.run_stratum(deadline);

// Yield between each stratum to receive more events.
// TODO(mingwei): really only need to yield at start of ticks though.
Expand All @@ -190,7 +214,7 @@ impl Hydroflow {
/// Runs the current stratum of the dataflow until no more local work is available (does not receive events).
/// Returns true if any work was done.
#[tracing::instrument(level = "trace", skip(self), fields(tick = self.context.current_tick, stratum = self.context.current_stratum), ret)]
pub fn run_stratum(&mut self) -> bool {
pub fn run_stratum(&mut self, deadline: Option<Instant>) -> bool {
let current_tick = self.context.current_tick;

let mut work_done = false;
Expand Down Expand Up @@ -227,6 +251,10 @@ impl Hydroflow {
}
}
}

if deadline.map_or(false, |dl| dl <= Instant::now()) {
break;
}
}
work_done
}
Expand Down
Loading