From 85cd677e650bfcaf92b35162516f27a82f8ac45c Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Wed, 1 Nov 2023 14:26:49 -0700 Subject: [PATCH] feat(hydroflow): Add deadline `Instant` args to hydroflow run methods --- hydroflow/src/scheduled/graph.rs | 42 ++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index af010c3cf439..792ec029e982 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -138,14 +138,20 @@ impl Hydroflow { /// Runs the dataflow until the next tick begins. /// Returns true if any work was done. - #[tracing::instrument(level = "trace", skip(self), ret)] pub fn run_tick(&mut self) -> bool { + self.run_tick_deadline(None) + } + + /// Runs the dataflow until the next tick begins, or the deadline is reached. + /// Returns true if any work was done. + #[tracing::instrument(level = "trace", skip(self), ret)] + pub fn run_tick_deadline(&mut self, deadline: Option) -> bool { let mut work_done = false; // While work is immediately available *on the current tick*. while self.next_stratum(true) { work_done = true; // Do any work. - self.run_stratum(); + self.run_stratum(deadline); } work_done } @@ -154,14 +160,23 @@ impl Hydroflow { /// Runs at least one tick of dataflow, even if no external events have been received. /// If the dataflow contains loops this method may run forever. /// Returns true if any work was done. - #[tracing::instrument(level = "trace", skip(self), ret)] pub fn run_available(&mut self) -> bool { + self.run_available_deadline(None) + } + + /// Runs the dataflow until no more (externally-triggered) work is immediately available or the + /// deadline is reached. + /// Runs at least one tick of dataflow, even if no external events have been received. + /// If the dataflow contains loops this method may run forever. + /// Returns true if any work was done. + #[tracing::instrument(level = "trace", skip(self), ret)] + pub fn run_available_deadline(&mut self, deadline: Option) -> bool { let mut work_done = false; // While work is immediately available. while self.next_stratum(false) { work_done = true; // Do any work. - self.run_stratum(); + self.run_stratum(deadline); } work_done } @@ -171,14 +186,23 @@ impl Hydroflow { /// If the dataflow contains loops this method may run forever. /// Returns true if any work was done. /// Yields repeatedly to allow external events to happen. - #[tracing::instrument(level = "trace", skip(self), ret)] pub async fn run_available_async(&mut self) -> bool { + self.run_available_deadline_async(None) + } + + /// Runs the dataflow until no more (externally-triggered) work is immediately available. + /// Runs at least one tick of dataflow, even if no external events have been received. + /// If the dataflow contains loops this method may run forever. + /// Returns true if any work was done. + /// Yields repeatedly to allow external events to happen. + #[tracing::instrument(level = "trace", skip(self), ret)] + pub async fn run_available_deadline_async(&mut self, deadline: Option) -> bool { let mut work_done = false; // While work is immediately available. while self.next_stratum(false) { work_done = true; // Do any work. - self.run_stratum(); + self.run_stratum(deadline); // Yield between each stratum to receive more events. // TODO(mingwei): really only need to yield at start of ticks though. @@ -190,7 +214,7 @@ impl Hydroflow { /// Runs the current stratum of the dataflow until no more local work is available (does not receive events). /// Returns true if any work was done. #[tracing::instrument(level = "trace", skip(self), fields(tick = self.context.current_tick, stratum = self.context.current_stratum), ret)] - pub fn run_stratum(&mut self) -> bool { + pub fn run_stratum(&mut self, deadline: Option) -> bool { let current_tick = self.context.current_tick; let mut work_done = false; @@ -227,6 +251,10 @@ impl Hydroflow { } } } + + if deadline.map_or(false, |dl| dl <= Instant::now()) { + break; + } } work_done }