Skip to content

Commit

Permalink
Don't bother retrying the transaction abort
Browse files Browse the repository at this point in the history
And also leave a comment, since it's not obvious why it's worth the
trouble to do at all.
  • Loading branch information
bkirwi committed Dec 1, 2022
1 parent d7fc137 commit 36509db
Showing 1 changed file with 14 additions and 27 deletions.
41 changes: 14 additions & 27 deletions src/storage/src/sink/kafka.rs
Expand Up @@ -342,12 +342,21 @@ impl KafkaTxProducer {
match f(self.clone()).await {
Ok(value) => RetryResult::Ok(value),
Err(KafkaError::Transaction(e)) if e.txn_requires_abort() => {
match self.abort_active_txn().await {
Ok(()) => RetryResult::FatalErr(
anyhow!(e).context("transaction error required abort"),
),
Err(e) => RetryResult::FatalErr(e),
// Make one attempt at aborting the transaction before letting the error
// percolate up and the process exit. Aborting allows the consumers of the
// topic to skip over any messages we've written in the transaction, so it's
// polite to do... but if it fails, the transaction will be aborted either
// when fenced out by a future version of this producer or by the
// broker-side timeout.
if let Err(e) = self.abort_transaction().await {
warn!(
error =? e,
"failed to abort transaction after an error that required it"
);
}
RetryResult::FatalErr(
anyhow!(e).context("transaction error requiring abort"),
)
}
Err(KafkaError::Transaction(e)) if e.is_retriable() => {
RetryResult::RetryableErr(anyhow!(e).context("retriable transaction error"))
Expand All @@ -359,28 +368,6 @@ impl KafkaTxProducer {
})
.await
}

async fn abort_active_txn(&self) -> Result<(), anyhow::Error> {
// Results are nested so we can exit early on a non-retriable (inner) error.
Retry::default()
.clamp_backoff(BACKOFF_CLAMP)
.max_tries(3)
.retry_async(|_| async {
match self.abort_transaction().await {
Ok(()) => RetryResult::Ok(()),
Err(KafkaError::Transaction(e)) if e.is_retriable() => {
RetryResult::RetryableErr(
anyhow!(e)
.context("retryable transaction error while aborting transaction"),
)
}
Err(e) => RetryResult::FatalErr(
anyhow!(e).context("non-retriable error while aborting transaction"),
),
}
})
.await
}
}

struct KafkaSinkState {
Expand Down

0 comments on commit 36509db

Please sign in to comment.