diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 051e45c71e670..26babf71b153d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -400,6 +400,12 @@ class MicroBatchExecution( // The V2 API does not have the same edge case requiring getBatch to be called // here, so we do nothing here. } + // Last batch was not committed successfully. We need to re-executing the batch. + // `currentBatchId` should be latest committed batch id, because we will use + // `currentBatchId` to construct `IncrementalExecution` while running the batch. + // If we use `latestBatchId` from offset log, state providers will try to load + // state map of `latestBatchId`, but the version of states is not committed. + currentBatchId = latestCommittedBatchId } else if (latestCommittedBatchId < latestBatchId - 1) { logWarning(s"Batch completion log latest batch id is " + s"${latestCommittedBatchId}, which is not trailing " +