Skip to content

Commit

Permalink
Limit the number of retries on retriable transaction errors
Browse files Browse the repository at this point in the history
The design philosophy for sinks is that, on persistent errors, the sink
should eventually halt. This is especially important now that we track
the status of the sink, so errors are reported promptly.

Anyways, now we have a smallish retry limit on each of the two
operations. I've also converted these back to a more functional style,
since otherwise it'd be tedious to track the previous error across loop
iterations.
  • Loading branch information
bkirwi committed Nov 29, 2022
1 parent 6fb632f commit 02ef484
Showing 1 changed file with 30 additions and 41 deletions.
71 changes: 30 additions & 41 deletions src/storage/src/sink/kafka.rs
Expand Up @@ -330,57 +330,46 @@ impl KafkaTxProducer {
.map_err(|(e, record)| (e, Box::new(record)))
}

async fn retry_on_txn_error<'a, F, Fut, T>(&self, f: F) -> Result<T, String>
async fn retry_on_txn_error<'a, F, Fut, T>(&self, f: F) -> anyhow::Result<T>
where
F: Fn(KafkaTxProducer) -> Fut,
Fut: Future<Output = KafkaResult<T>>,
{
let stream = Retry::default()
// Results are nested so we can exit early on a non-retriable (inner) error.
Retry::default()
.clamp_backoff(BACKOFF_CLAMP)
.into_retry_stream();
tokio::pin!(stream);
loop {
stream.next().await;
match f(self.clone()).await {
Ok(result) => return Ok(result),
Err(KafkaError::Transaction(e)) if e.txn_requires_abort() => {
info!("error requiring txn abort in kafka sink: {:?}", e);
self.abort_active_txn().await?;
return Err(format!(
"shutting down due to error requiring txn abort in kafka sink: {e:?}"
));
}
Err(KafkaError::Transaction(e)) if e.is_retriable() => {
info!("retriable error in kafka sink: {e:?}; will retry");
continue;
}
Err(e) => {
return Err(format!("shutting down due to non-retriable error: {e:?}"));
.max_tries(3)
.retry_async(|_| async {
match f(self.clone()).await {
Ok(value) => Ok(Ok(value)),
Err(KafkaError::Transaction(e)) if e.txn_requires_abort() => {
self.abort_active_txn().await?;
Ok(Err(e).context("transaction error required abort"))
}
Err(KafkaError::Transaction(e)) if e.is_retriable() => {
Err(e).context("retriable transaction error")
}
Err(e) => Ok(Err(e).context("non-retriable transaction error")),
}
}
}
})
.await?
}

async fn abort_active_txn(&self) -> Result<(), String> {
let stream = Retry::default()
async fn abort_active_txn(&self) -> anyhow::Result<()> {
// Results are nested so we can exit early on a non-retriable (inner) error.
Retry::default()
.clamp_backoff(BACKOFF_CLAMP)
.into_retry_stream();
tokio::pin!(stream);
loop {
stream.next().await;
info!("Attempting to abort kafka transaction");
match self.abort_transaction().await {
Ok(()) => return Ok(()),
Err(KafkaError::Transaction(e)) if e.is_retriable() => {
continue;
}
Err(e) => {
return Err(format!(
"non-retriable error while aborting kafka transaction: {e:?}"
));
.max_tries(3)
.retry_async(|_| async {
match self.abort_transaction().await {
Ok(()) => Ok(Ok(())),
Err(KafkaError::Transaction(e)) if e.is_retriable() => {
Err(e).context("retriable transaction error while aborting transaction")
}
Err(e) => Ok(Err(e).context("non-retriable error while aborting transaction")),
}
}
}
})
.await?
}
}

Expand Down

0 comments on commit 02ef484

Please sign in to comment.