Skip to content

Commit

Permalink
ref(reduce): Refactor for timeout=0 (#363)
Browse files Browse the repository at this point in the history
* ref(reduce): Refactor for timeout=0

Slightly simplify the logic so that timeout=0 is honored.

Needed for getsentry/team-sns#4

* appease our lord clippy

* what
  • Loading branch information
untitaker committed May 15, 2024
1 parent bc487fe commit 22f5cb0
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions rust-arroyo/src/processing/strategies/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,25 @@ impl<T: Send + Sync, TResult: Send + Sync> ProcessingStrategy<T> for Reduce<T, T

fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
let deadline = timeout.map(Deadline::new);
if self.message_carried_over.is_some() {
while self.message_carried_over.is_some() {
let next_commit = self.next_step.poll()?;
self.commit_request_carried_over =
merge_commit_request(self.commit_request_carried_over.take(), next_commit);
self.flush(true)?;

if deadline.map_or(false, |d| d.has_elapsed()) {
tracing::warn!("Timeout reached while waiting for tasks to finish");
break;
}

loop {
if deadline.map_or(false, |d| d.has_elapsed()) {
tracing::warn!(
"Timeout {:?} reached while waiting for tasks to finish",
timeout
);
break;
}
} else {

let next_commit = self.next_step.poll()?;
self.commit_request_carried_over =
merge_commit_request(self.commit_request_carried_over.take(), next_commit);

self.flush(true)?;

if self.message_carried_over.is_none() {
break;
}
}

let next_commit = self.next_step.join(deadline.map(|d| d.remaining()))?;
Expand Down

0 comments on commit 22f5cb0

Please sign in to comment.