Skip to content

Commit

Permalink
ref(rust): Remove strategy.close (#361)
Browse files Browse the repository at this point in the history
I long felt that nobody ever does anything useful in this method in Rust
or in Python. And it makes it a bit harder to think about what "state"
the strategy is in,  since strategy.join() sometimes calls close() and
then can no longer be called multiple times.
  • Loading branch information
untitaker committed May 15, 2024
1 parent 78de503 commit bc487fe
Show file tree
Hide file tree
Showing 10 changed files with 1 addition and 59 deletions.
1 change: 0 additions & 1 deletion rust-arroyo/examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ impl ProcessingStrategy<KafkaPayload> for Noop {
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
Expand Down
11 changes: 1 addition & 10 deletions rust-arroyo/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
let mut state = self.0.locked_state();
if let Some(s) = state.strategy.as_mut() {
let result = panic::catch_unwind(AssertUnwindSafe(|| {
tracing::info!("Closing and joining strategy");
s.close();
tracing::info!("Joining strategy");
s.join(None)
}));

Expand Down Expand Up @@ -467,8 +466,6 @@ mod tests {
Ok(())
}

fn close(&mut self) {}

fn terminate(&mut self) {}

fn join(&mut self, _: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Expand Down Expand Up @@ -567,12 +564,6 @@ mod tests {
Ok(())
}

fn close(&mut self) {
if self.panic_on == "close" {
panic!("panic in close");
}
}

fn terminate(&mut self) {}

fn join(
Expand Down
2 changes: 0 additions & 2 deletions rust-arroyo/src/processing/strategies/commit_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ impl<T> ProcessingStrategy<T> for CommitOffsets {
Ok(())
}

fn close(&mut self) {}

fn terminate(&mut self) {}

fn join(
Expand Down
4 changes: 0 additions & 4 deletions rust-arroyo/src/processing/strategies/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ where
self.next_step.submit(message)
}

fn close(&mut self) {
self.next_step.close()
}

fn terminate(&mut self) {
self.next_step.terminate()
}
Expand Down
14 changes: 0 additions & 14 deletions rust-arroyo/src/processing/strategies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,6 @@ pub trait ProcessingStrategy<TPayload>: Send + Sync {
/// ``MessageRejected`` exception.
fn submit(&mut self, message: Message<TPayload>) -> Result<(), SubmitError<TPayload>>;

/// Close this instance. No more messages should be accepted by the
/// instance after this method has been called.
///
/// This method should not block. Once this strategy instance has
/// finished processing (or discarded) all messages that were submitted
/// prior to this method being called, the strategy should commit its
/// partition offsets and release any resources that will no longer be
/// used (threads, processes, sockets, files, etc.)
fn close(&mut self);

/// Close the processing strategy immediately, abandoning any work in
/// progress. No more messages should be accepted by the instance after
/// this method has been called.
Expand All @@ -139,10 +129,6 @@ impl<TPayload, S: ProcessingStrategy<TPayload> + ?Sized> ProcessingStrategy<TPay
(**self).submit(message)
}

fn close(&mut self) {
(**self).close()
}

fn terminate(&mut self) {
(**self).terminate()
}
Expand Down
7 changes: 0 additions & 7 deletions rust-arroyo/src/processing/strategies/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ impl ProcessingStrategy<KafkaPayload> for Produce {
self.inner.submit(message)
}

fn close(&mut self) {
self.inner.close();
}

fn terminate(&mut self) {
self.inner.terminate();
}
Expand Down Expand Up @@ -134,7 +130,6 @@ mod tests {
self.0.lock().submit += 1;
Ok(())
}
fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(
&mut self,
Expand Down Expand Up @@ -168,7 +163,6 @@ mod tests {
) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(
&mut self,
Expand Down Expand Up @@ -198,7 +192,6 @@ mod tests {
};

strategy.submit(message).unwrap();
strategy.close();
let _ = strategy.join(None);
}

Expand Down
10 changes: 0 additions & 10 deletions rust-arroyo/src/processing/strategies/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ impl<T: Send + Sync, TResult: Send + Sync> ProcessingStrategy<T> for Reduce<T, T
Ok(())
}

fn close(&mut self) {
self.next_step.close();
}

fn terminate(&mut self) {
self.next_step.terminate();
}
Expand Down Expand Up @@ -236,8 +232,6 @@ mod tests {
Ok(())
}

fn close(&mut self) {}

fn terminate(&mut self) {}

fn join(&mut self, _: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Expand Down Expand Up @@ -292,7 +286,6 @@ mod tests {
// and 1 message is left before next size limit.
assert_eq!(strategy.batch_state.message_count, 1);

strategy.close();
let _ = strategy.join(None);

// 2 batches were created
Expand Down Expand Up @@ -349,7 +342,6 @@ mod tests {
// means 1 batch was cleared and 5 items are in the current batch.
assert_eq!(strategy.batch_state.message_count, 5);

strategy.close();
let _ = strategy.join(None);

// 2 batches were created
Expand Down Expand Up @@ -406,7 +398,6 @@ mod tests {
// until timeout (which will not happen as part of this test)
assert_eq!(strategy.batch_state.message_count, 0);

strategy.close();
let _ = strategy.join(None);

// no batches were created
Expand Down Expand Up @@ -461,7 +452,6 @@ mod tests {
// until timeout (which will not happen as part of this test)
assert_eq!(strategy.batch_state.message_count, 0);

strategy.close();
let _ = strategy.join(None);

// "empty" batch was created -- flushed even though the batch size callback claims it is of
Expand Down
5 changes: 0 additions & 5 deletions rust-arroyo/src/processing/strategies/run_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ impl<TPayload, TTransformed: Send + Sync> ProcessingStrategy<TPayload>
Ok(())
}

fn close(&mut self) {
self.next_step.close()
}

fn terminate(&mut self) {
self.next_step.terminate()
}
Expand Down Expand Up @@ -117,7 +113,6 @@ mod tests {
fn submit(&mut self, _message: Message<String>) -> Result<(), SubmitError<String>> {
Ok(())
}
fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(
&mut self,
Expand Down
5 changes: 0 additions & 5 deletions rust-arroyo/src/processing/strategies/run_task_in_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ where
Ok(())
}

fn close(&mut self) {
self.next_step.close();
}

fn terminate(&mut self) {
for handle in &self.handles {
handle.abort();
Expand Down Expand Up @@ -299,7 +295,6 @@ mod tests {
self.0.lock().unwrap().submit += 1;
Ok(())
}
fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(
&mut self,
Expand Down
1 change: 0 additions & 1 deletion rust-arroyo/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl<T: Send> ProcessingStrategy<T> for TestStrategy<T> {
Ok(())
}

fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
Expand Down

0 comments on commit bc487fe

Please sign in to comment.