From 57c200b601a4fd4de5434fa2e29986e301bd72ae Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 18 Nov 2022 11:42:05 -0800 Subject: [PATCH 1/2] Use latestCommittedBatchId as currentBatchId when resuming late batch --- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 051e45c71e670..26babf71b153d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -400,6 +400,12 @@ class MicroBatchExecution( // The V2 API does not have the same edge case requiring getBatch to be called // here, so we do nothing here. } + // Last batch was not committed successfully. We need to re-executing the batch. + // `currentBatchId` should be latest committed batch id, because we will use + // `currentBatchId` to construct `IncrementalExecution` while running the batch. + // If we use `latestBatchId` from offset log, state providers will try to load + // state map of `latestBatchId`, but the version of states is not committed. + currentBatchId = latestCommittedBatchId } else if (latestCommittedBatchId < latestBatchId - 1) { logWarning(s"Batch completion log latest batch id is " + s"${latestCommittedBatchId}, which is not trailing " + From 3faf2f6de10c4d330f9e57574ec0d164e0e80709 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 18 Nov 2022 16:07:10 -0800 Subject: [PATCH 2/2] Trigger Build