From e696ce54a6ece4fc286b161217d0384087886d9b Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Sun, 27 Mar 2022 19:01:02 -0700 Subject: [PATCH 1/2] [SPARK-38670] Add offset commit time to streaming query listener --- .../sql/execution/streaming/MicroBatchExecution.scala | 8 +++++--- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 3b409fa2f6a72..be7a27fb788af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -672,9 +672,11 @@ class MicroBatchExecution( withProgressLocked { sinkCommitProgress = batchSinkProgress watermarkTracker.updateWatermark(lastExecution.executedPlan) - assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)), - "Concurrent update to the commit log. Multiple streaming jobs detected for " + - s"$currentBatchId") + reportTimeTaken("commitOffsets") { + assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)), + "Concurrent update to the commit log. Multiple streaming jobs detected for " + + s"$currentBatchId") + } committedOffsets ++= availableOffsets } logDebug(s"Completed batch ${currentBatchId}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 84060733e865c..33a877d8f42db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -316,6 +316,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(query.recentProgress.last.eq(query.lastProgress)) val progress = query.lastProgress + logInfo("progress: " + progress.prettyJson) + assert(progress.id === query.id) assert(progress.name === query.name) assert(progress.batchId === 0) From 1d4b17eccfcb8fc3137f6454a93f0b70f330bddd Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Sun, 27 Mar 2022 19:04:19 -0700 Subject: [PATCH 2/2] fix --- .../org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 33a877d8f42db..a4b993274c3f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -316,7 +316,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(query.recentProgress.last.eq(query.lastProgress)) val progress = query.lastProgress - logInfo("progress: " + progress.prettyJson) assert(progress.id === query.id) assert(progress.name === query.name) @@ -328,6 +327,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.durationMs.get("latestOffset") === 50) assert(progress.durationMs.get("queryPlanning") === 100) assert(progress.durationMs.get("walCommit") === 0) + assert(progress.durationMs.get("commitOffsets") === 0) assert(progress.durationMs.get("addBatch") === 350) assert(progress.durationMs.get("triggerExecution") === 500)