From 0de57eaa539e6a51c9f7169d6f77445b91f95238 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Mon, 4 May 2026 12:34:32 -0700 Subject: [PATCH 1/6] [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution Co-authored-by: Isaac --- .../resources/error/error-conditions.json | 10 + .../sql/streaming/DataStreamWriter.scala | 37 +++- .../catalyst/streaming/WriteToStream.scala | 1 + .../streaming/WriteToStreamStatement.scala | 2 + .../sql/errors/QueryCompilationErrors.scala | 6 + .../apache/spark/sql/internal/SQLConf.scala | 12 ++ .../spark/sql/connect/DataStreamWriter.scala | 5 + .../spark/sql/classic/DataStreamWriter.scala | 10 + .../sql/classic/StreamingQueryManager.scala | 4 + .../runtime/MicroBatchExecution.scala | 24 ++- .../runtime/ResolveWriteToStream.scala | 1 + .../spark/sql/streaming/StreamTest.scala | 1 + .../test/StreamingSinkEvolutionSuite.scala | 201 ++++++++++++++++++ 13 files changed, 312 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index a3cd9f8536eb5..1fbac3a1ad14d 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6855,6 +6855,11 @@ "Duplicate streaming source names detected: . Each streaming source must have a unique name." ] }, + "INVALID_SINK_NAME" : { + "message" : [ + "Invalid streaming sink name: ''. Sink names must only contain ASCII letters ('a'-'z', 'A'-'Z'), digits ('0'-'9'), and underscores ('_')." + ] + }, "INVALID_SOURCE_NAME" : { "message" : [ "Invalid streaming source name ''. Source names must only contain ASCII letters (a-z, A-Z), digits (0-9), and underscores (_)." @@ -6865,6 +6870,11 @@ "Streaming source naming is not supported. Source name '' was provided but the feature is disabled. Please enable the feature by setting spark.sql.streaming.queryEvolution.enableSourceEvolution to true." ] }, + "UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT" : { + "message" : [ + "Streaming sink must be named when spark.sql.streaming.queryEvolution.enableSinkEvolution is enabled. Use the name() method on DataStreamWriter to assign a name to the streaming sink." + ] + }, "UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT" : { "message" : [ "All streaming sources must be named when spark.sql.streaming.queryEvolution.enableSourceEvolution is enabled. Unnamed sources found: . Use the name() method to assign names to all streaming sources." diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index cb5ecc728c441..3cb601ec8f618 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.streaming import java.util.concurrent.TimeoutException +import scala.util.matching.Regex + import org.apache.spark.annotation.Evolving import org.apache.spark.api.java.function.VoidFunction2 -import org.apache.spark.sql.{Dataset, ForeachWriter, WriteConfigMethods} +import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter, WriteConfigMethods} /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, @@ -90,6 +92,19 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T */ def queryName(queryName: String): this.type + /** + * Assigns a name to this streaming sink for sink evolution capability. + * When sinks are named, they can be tracked in checkpoint metadata, + * enabling query evolution. + * + * If not specified, sinks are automatically assigned a default name + * based on their position in the query, which maintains backward compatibility. + * + * @param sinkName the unique name for this sink (alphanumeric and underscore only) + * @since 4.1.0 + */ + private[sql] def name(sinkName: String): this.type + /** * Specifies the underlying output data source. * @@ -217,6 +232,26 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T @throws[TimeoutException] def toTable(tableName: String): StreamingQuery + /** + * Validates that a streaming sink name only contains alphanumeric characters and underscores. + * + * @param sinkName + * the sink name to validate + * @throws AnalysisException + * if the sink name contains invalid characters + */ + private[sql] def validateSinkName(sinkName: String): Unit = { + require(sinkName != null, "Sink name cannot be null") + require(sinkName.nonEmpty, "Sink name cannot be empty") + + val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r + if (!validNamePattern.pattern.matcher(sinkName).matches()) { + throw new AnalysisException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + messageParameters = Map("sinkName" -> sinkName)) + } + } + /////////////////////////////////////////////////////////////////////////////////////// // Covariant Overrides /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala index 884a4165d077e..6e0583f778350 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.streaming.OutputMode */ case class WriteToStream( name: String, + sinkName: Option[String], resolvedCheckpointLocation: String, sink: Table, outputMode: OutputMode, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala index 7015d0dd3b2cc..61e64a526aeda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger} * rule [[ResolveStreamWrite]]. * * @param userSpecifiedName Query name optionally specified by the user. + * @param userSpecifiedSinkName Sink name optionally specified by the user for sink evolution. * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user * has not specified one. If false, then error will be thrown. @@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger} */ case class WriteToStreamStatement( userSpecifiedName: Option[String], + userSpecifiedSinkName: Option[String], userSpecifiedCheckpointLocation: Option[String], useTempCheckpointLocation: Boolean, recoverFromCheckpointLocation: Boolean, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 6f3348d01bbba..1716c6eebebf2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2545,6 +2545,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("sourceName" -> sourceName)) } + def invalidStreamingSinkNameError(sinkName: String): Throwable = { + new AnalysisException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + messageParameters = Map("sinkName" -> sinkName)) + } + def duplicateStreamingSourceNamesError(duplicateNames: Seq[String]): Throwable = { new AnalysisException( errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 83f1816c9727f..fbf9a04ddc14d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3143,6 +3143,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ENABLE_STREAMING_SINK_EVOLUTION = + buildConf("spark.sql.streaming.queryEvolution.enableSinkEvolution") + .internal() + .doc("When true, streaming sinks can be named using the name() API on DataStreamWriter. " + + "This enables sink evolution capability where sinks can be changed while maintaining " + + "a historical record of all sinks used in the checkpoint.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART = buildConf("spark.sql.streaming.checkUnfinishedRepartitionOnRestart") .internal() @@ -7673,6 +7683,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def enableStreamingSourceEvolution: Boolean = getConf(ENABLE_STREAMING_SOURCE_EVOLUTION) + def enableStreamingSinkEvolution: Boolean = getConf(ENABLE_STREAMING_SINK_EVOLUTION) + def streamingCheckUnfinishedRepartitionOnRestart: Boolean = getConf(STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala index ffa11b5d7ab0d..bac41acc83f03 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala @@ -82,6 +82,11 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) this } + /** @inheritdoc */ + private[sql] def name(sinkName: String): this.type = { + throw new UnsupportedOperationException("Sink naming is not supported in Spark Connect") + } + /** @inheritdoc */ def format(source: String): this.type = { sinkBuilder.setFormat(source) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 38483395ec8c5..e2b5961411a0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -83,6 +83,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D this } + /** @inheritdoc */ + private[sql] def name(sinkName: String): this.type = { + validateSinkName(sinkName) + this.sinkName = Some(sinkName) + this + } + /** @inheritdoc */ def format(source: String): this.type = { this.source = source @@ -312,6 +319,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D ds.sparkSession.sessionState.streamingQueryManager.startQuery( newOptions.get("queryName"), + sinkName, newOptions.get("checkpointLocation"), ds, newOptions.originalMap, @@ -444,6 +452,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D private var partitioningColumns: Option[Seq[String]] = None private var clusteringColumns: Option[Seq[String]] = None + + private var sinkName: Option[String] = None } object DataStreamWriter { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala index 72ae3b21d662a..fff8d32a0709b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala @@ -176,6 +176,7 @@ class StreamingQueryManager private[sql] ( // scalastyle:off argcount private def createQuery( userSpecifiedName: Option[String], + userSpecifiedSinkName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: Dataset[_], extraOptions: Map[String, String], @@ -207,6 +208,7 @@ class StreamingQueryManager private[sql] ( val dataStreamWritePlan = WriteToStreamStatement( userSpecifiedName, + userSpecifiedSinkName, userSpecifiedCheckpointLocation, useTempCheckpointLocation, recoverFromCheckpointLocation, @@ -277,6 +279,7 @@ class StreamingQueryManager private[sql] ( @throws[TimeoutException] private[sql] def startQuery( userSpecifiedName: Option[String], + userSpecifiedSinkName: Option[String] = None, userSpecifiedCheckpointLocation: Option[String], df: Dataset[_], extraOptions: Map[String, String], @@ -290,6 +293,7 @@ class StreamingQueryManager private[sql] ( catalogTable: Option[CatalogTable] = None): StreamingQuery = { val query = createQuery( userSpecifiedName, + userSpecifiedSinkName, userSpecifiedCheckpointLocation, df, extraOptions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index e6d0666aca259..d4fae034e7974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -26,7 +26,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException} +import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkIllegalStateException} import org.apache.spark.internal.LogKeys import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -112,6 +112,22 @@ class MicroBatchExecution( override protected def sourceToIdMap: Map[SparkDataStream, String] = sourceIdMap.map(_.swap) + // Sink name for commit log support + // If sink evolution is enabled, use user-provided sinkName (or error if not provided) + // Otherwise, always use DEFAULT_SINK_NAME for backward compatibility + private val sinkName: String = { + if (sparkSession.sessionState.conf.enableStreamingSinkEvolution) { + plan.sinkName.getOrElse { + throw new SparkException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT", + messageParameters = Map.empty, + cause = null) + } + } else { + MicroBatchExecution.DEFAULT_SINK_NAME + } + } + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ protected def getTrigger(): TriggerExecutor = { @@ -1466,6 +1482,12 @@ class MicroBatchExecution( object MicroBatchExecution { val BATCH_ID_KEY = "streaming.sql.batchId" + + /** + * Default sink name used when sink evolution is disabled or no explicit name is provided. + * This maintains backward compatibility with existing streaming queries. + */ + private[sql] val DEFAULT_SINK_NAME = "sink-0" } case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends LeafNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala index ff0d71d0f0759..0be430591dbd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala @@ -66,6 +66,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] { WriteToStream( s.userSpecifiedName.orNull, + s.userSpecifiedSinkName, resolvedCheckpointLocation, s.sink, s.outputMode, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 0e33b271522dc..a6067aaf189e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -598,6 +598,7 @@ trait StreamTest extends SharedSparkSession with TimeLimits { sparkSession .streams .startQuery( + None, None, Some(metadataRoot), stream, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala new file mode 100644 index 0000000000000..a242faabaf921 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.test + +import org.scalatest.{BeforeAndAfterEach, Tag} + +import org.apache.spark.SparkException +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.util.Utils + +/** + * Test suite for streaming sink evolution features including: + * - Sink naming via DataStreamWriter.name() + * - Sink name validation + * - Sink evolution enforcement + */ +class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach { + import testImplicits._ + + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + override def afterEach(): Unit = { + spark.streams.active.foreach(_.stop()) + super.afterEach() + } + + // ========================= + // Sink Name Validation Tests + // ========================= + + testWithSinkEvolution("invalid sink name - contains hyphen") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + checkError( + exception = intercept[AnalysisException] { + input.toDF().writeStream + .format("noop") + .name("my-sink") + .option("checkpointLocation", newMetadataDir) + .start() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + parameters = Map("sinkName" -> "my-sink")) + } + + testWithSinkEvolution("invalid sink name - contains space") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + checkError( + exception = intercept[AnalysisException] { + input.toDF().writeStream + .format("noop") + .name("my sink") + .option("checkpointLocation", newMetadataDir) + .start() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + parameters = Map("sinkName" -> "my sink")) + } + + testWithSinkEvolution("invalid sink name - contains special characters") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + checkError( + exception = intercept[AnalysisException] { + input.toDF().writeStream + .format("noop") + .name("my.sink@123!") + .option("checkpointLocation", newMetadataDir) + .start() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + parameters = Map("sinkName" -> "my.sink@123!")) + } + + testWithSinkEvolution("valid sink names - various patterns") { + Seq("mySink", "my_sink", "MySink123", "_private", "sink_123_test", "123sink") + .foreach { sinkName => + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val q = input.toDF().writeStream + .format("noop") + .name(sinkName) + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + } + + // =========================== + // Sink Evolution Enforcement + // =========================== + + testWithSinkEvolution("unnamed sink with sink evolution enabled throws error") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val exception = intercept[SparkException] { + val q = input.toDF().writeStream + .format("noop") + // No .name() call - sink is unnamed + .option("checkpointLocation", newMetadataDir) + .start() + q.processAllAvailable() + q.stop() + } + + checkError( + exception = exception, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT", + parameters = Map.empty) + } + + test("unnamed sink without sink evolution enabled uses default name") { + withSQLConf( + SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + // Should succeed - no name required when sink evolution is disabled + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", newMetadataDir) + .start() + q.processAllAvailable() + q.stop() + } + } + + testWithSinkEvolution("named sink succeeds with sink evolution enabled") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val q = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", newMetadataDir) + .start() + q.processAllAvailable() + q.stop() + } + + testWithSinkEvolution("continuing with same sink name works") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // Start with my_sink + input.addData(1, 2, 3) + val q1 = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", checkpointDir) + .start() + q1.processAllAvailable() + q1.stop() + + // Restart with same sink name - should work + input.addData(4, 5, 6) + val q2 = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", checkpointDir) + .start() + q2.processAllAvailable() + q2.stop() + } + + // ============== + // Helper Methods + // ============== + + /** + * Helper method to run tests with sink evolution enabled. + */ + def testWithSinkEvolution(testName: String, testTags: Tag*)(testBody: => Any): Unit = { + test(testName, testTags: _*) { + withSQLConf( + SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "true") { + testBody + } + } + } +} From cf30ae8ffa5b09740f3ba6cbeed9fe054ad3559e Mon Sep 17 00:00:00 2001 From: ericm-db Date: Tue, 19 May 2026 17:44:12 +0000 Subject: [PATCH 2/6] [SPARK-56719][SS][FOLLOW-UP] Add MiMa exclusion and binding policy exception for sink evolution Adds the MiMa `ReversedMissingMethodProblem` exclusion for the newly added `DataStreamWriter.name()` API, and registers the new `spark.sql.streaming.queryEvolution.enableSinkEvolution` SQL config in the binding-policy exceptions file (consistent with its `enableSourceEvolution` sibling). Co-authored-by: Isaac --- project/MimaExcludes.scala | 5 ++++- .../configs-without-binding-policy-exceptions | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b5434efee090c..376612cbe2617 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -88,7 +88,10 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList"), // [SPARK-54323][PYTHON] Change the way to access logs to TVF instead of system view - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs"), + + // [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamWriter.name") ) // Default exclude rules diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index 2aa6cb885ca31..b295c08879ef9 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -1035,6 +1035,7 @@ spark.sql.streaming.numRecentProgressUpdates spark.sql.streaming.offsetLog.formatVersion spark.sql.streaming.optimizeOneRowPlan.enabled spark.sql.streaming.pollingDelay +spark.sql.streaming.queryEvolution.enableSinkEvolution spark.sql.streaming.queryEvolution.enableSourceEvolution spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint spark.sql.streaming.realTimeMode.allowlistCheck From d47b6d19801f5a5dc5380e23a70a45feff73c86a Mon Sep 17 00:00:00 2001 From: ericm-db Date: Tue, 19 May 2026 15:22:34 -0700 Subject: [PATCH 3/6] ./build/mvn --- .../spark/sql/streaming/DataStreamWriter.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 3cb601ec8f618..a324012f9141f 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -93,14 +93,14 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T def queryName(queryName: String): this.type /** - * Assigns a name to this streaming sink for sink evolution capability. - * When sinks are named, they can be tracked in checkpoint metadata, - * enabling query evolution. + * Assigns a name to this streaming sink for sink evolution capability. When sinks are named, + * they can be tracked in checkpoint metadata, enabling query evolution. * - * If not specified, sinks are automatically assigned a default name - * based on their position in the query, which maintains backward compatibility. + * If not specified, sinks are automatically assigned a default name based on their position in + * the query, which maintains backward compatibility. * - * @param sinkName the unique name for this sink (alphanumeric and underscore only) + * @param sinkName + * the unique name for this sink (alphanumeric and underscore only) * @since 4.1.0 */ private[sql] def name(sinkName: String): this.type From a69667a2f3d4a3508dd1d18a3df75fd5fe9f01b7 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 20 May 2026 19:51:22 +0000 Subject: [PATCH 4/6] [SPARK-56970][SS] Split CommitMetadata into CommitMetadataBase + V1/V2 case classes Refactor `CommitLog` so that the commit log metadata is dispatched through a `CommitMetadataBase` trait with concrete `CommitMetadata` (V1, watermark only) and `CommitMetadataV2` (watermark + `stateUniqueIds`) case classes. The deserializer now reads the wire-format version from the file header and constructs the matching subclass. This is preparation for `CommitMetadataV3` (which adds sink metadata for streaming sink evolution) in a follow-up PR. Notable changes: - Add `CommitMetadataBase` trait and `CommitMetadataV2` case class. - `CommitMetadata` becomes V1 (no `stateUniqueIds` field). - Add `CommitLog.createMetadata` factory that dispatches by version and defaults to the configured `STATE_STORE_CHECKPOINT_FORMAT_VERSION`. - `CommitLog.readCommitMetadata` reads the version line and constructs the matching subclass. - `MicroBatchExecution`, `OfflineStateRepartitionRunner`, and the existing tests are updated to use the new types / factory. The pre-refactor `CommitMetadata` carried both the V1 and V2 wire shape in a single case class, with `stateUniqueIds` optional. That made it awkward to add a V3 wire format with additional fields, and forced `serialize` to take the wire version from `SQLConf` rather than from the metadata itself. No new public API. The wire format for V1 changes slightly: V1 commit log files no longer serialize `stateUniqueIds: null`. Old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field. This PR also relaxes the version-exact-match check on read so that a commit log opened with the V2 conf can deserialize a V1 file. This incidentally resolves SPARK-50653. - Existing `CommitLogSuite` (V1, V2, and cross-version) passes; the cross-version test now asserts successful V1 deserialization. - `StreamingSinkEvolutionSuite` (from SPARK-56719) still passes. Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Isaac --- .../checkpointing/AsyncCommitLog.scala | 4 +- .../streaming/checkpointing/CommitLog.scala | 105 ++++++++++++++---- .../runtime/MicroBatchExecution.scala | 6 +- .../state/OfflineStateRepartitionRunner.scala | 5 +- .../StateDataSourceChangeDataReadSuite.scala | 6 +- ...artitionAllColumnFamiliesWriterSuite.scala | 5 +- .../spark/sql/streaming/CommitLogSuite.scala | 29 ++--- 7 files changed, 112 insertions(+), 48 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala index 116ea18326ef0..0f031fcbb9512 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala @@ -48,7 +48,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: * the async write of the batch is completed. Future may also be completed exceptionally * to indicate some write error. */ - def addAsync(batchId: Long, metadata: CommitMetadata): CompletableFuture[Long] = { + def addAsync(batchId: Long, metadata: CommitMetadataBase): CompletableFuture[Long] = { require(metadata != null, "'null' metadata cannot be written to a metadata log") val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output => serialize(metadata, output) @@ -72,7 +72,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: * @param metadata metadata of batch to write * @return true if operation is successful otherwise false. */ - def addInMemory(batchId: Long, metadata: CommitMetadata): Boolean = { + def addInMemory(batchId: Long, metadata: CommitMetadataBase): Boolean = { if (batchCache.containsKey(batchId)) { false } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala index b73020b6060c6..8b32ff9e6208f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala @@ -26,6 +26,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf /** @@ -50,39 +51,99 @@ class CommitLog( sparkSession: SparkSession, path: String, readOnly: Boolean = false) - extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) { + extends HDFSMetadataLog[CommitMetadataBase](sparkSession, path, readOnly) { import CommitLog._ - private val VERSION: Int = sparkSession.conf.get( + // The configured commit log format version. Used as the default version when callers + // construct metadata through [[createMetadata]]. + private[sql] val VERSION: Int = sparkSession.conf.get( SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key).toInt - override protected[sql] def deserialize(in: InputStream): CommitMetadata = { - // called inside a try-finally where the underlying stream is closed in the caller - val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() - if (!lines.hasNext) { - throw new IllegalStateException("Incomplete log file in the offset commit log") - } - // TODO [SPARK-49462] This validation should be relaxed for a stateless query. - // TODO [SPARK-50653] This validation should be relaxed to support reading - // a V1 log file when VERSION is V2 - validateVersionExactMatch(lines.next().trim, VERSION) - val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON - CommitMetadata(metadataJson) + override protected[sql] def deserialize(in: InputStream): CommitMetadataBase = { + CommitLog.readCommitMetadata(in) } - override protected[sql] def serialize(metadata: CommitMetadata, out: OutputStream): Unit = { + override protected[sql] def serialize(metadata: CommitMetadataBase, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(s"v${VERSION}".getBytes(UTF_8)) + out.write(s"v${metadata.version}".getBytes(UTF_8)) out.write('\n') // write metadata out.write(metadata.json.getBytes(UTF_8)) } + + /** + * Factory for creating a [[CommitMetadataBase]] for the requested wire format version. + * Defaults to the version configured via [[SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION]]. + */ + def createMetadata( + nextBatchWatermarkMs: Long = 0, + stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None, + commitLogFormatVersion: Int = VERSION): CommitMetadataBase = { + commitLogFormatVersion match { + case VERSION_2 => + CommitMetadataV2(nextBatchWatermarkMs, stateUniqueIds) + case VERSION_1 => + CommitMetadata(nextBatchWatermarkMs) + case v => + throw QueryExecutionErrors.logVersionGreaterThanSupported(v, CommitLog.MAX_VERSION) + } + } } object CommitLog { private val EMPTY_JSON = "{}" + val VERSION_1 = 1 + val VERSION_2 = 2 + val MAX_VERSION: Int = VERSION_2 + + /** + * Reads a single commit log entry and dispatches to the matching + * [[CommitMetadataBase]] subclass based on the wire format version recorded in the file. + */ + private[spark] def readCommitMetadata(in: InputStream): CommitMetadataBase = { + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + val version = MetadataVersionUtil.validateVersion(lines.next().trim, MAX_VERSION) + val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON + version match { + case VERSION_2 => CommitMetadataV2(metadataJson) + case VERSION_1 => CommitMetadata(metadataJson) + case v => throw QueryExecutionErrors.logVersionGreaterThanSupported(v, MAX_VERSION) + } + } +} + +/** + * Base trait for commit log metadata. Concrete subclasses correspond to wire format versions + * and override [[version]] accordingly. + */ +trait CommitMetadataBase extends Serializable { + def version: Int + def nextBatchWatermarkMs: Long + def stateUniqueIds: Option[Map[Long, Array[Array[String]]]] + + def json: String = Serialization.write(this)(CommitMetadata.format) +} + +/** + * Commit log metadata for [[CommitLog.VERSION_1]]. Records the watermark for the next batch only. + * + * @param nextBatchWatermarkMs The watermark of the next batch. + */ +case class CommitMetadata( + nextBatchWatermarkMs: Long = 0) extends CommitMetadataBase { + override def version: Int = CommitLog.VERSION_1 + override def stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None +} + +object CommitMetadata { + implicit val format: Formats = Serialization.formats(NoTypeHints) + + def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) } /** @@ -104,19 +165,19 @@ object CommitLog { * +--- ...... * In the commit log, in addition to nextBatchWatermarkMs, we also store the unique ids of the * state store files. + * * @param nextBatchWatermarkMs The watermark of the next batch. * @param stateUniqueIds Map[Long, Array[Array[String]]] of map * OperatorId -> (partitionID -> array of uniqueID) */ - -case class CommitMetadata( +case class CommitMetadataV2( nextBatchWatermarkMs: Long = 0, - stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None) { - def json: String = Serialization.write(this)(CommitMetadata.format) + stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None) extends CommitMetadataBase { + override def version: Int = CommitLog.VERSION_2 } -object CommitMetadata { +object CommitMetadataV2 { implicit val format: Formats = Serialization.formats(NoTypeHints) - def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) + def apply(json: String): CommitMetadataV2 = Serialization.read[CommitMetadataV2](json) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index d4fae034e7974..86ec478ff81cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} @@ -1439,7 +1439,9 @@ class MicroBatchExecution( None } if (!commitLog.add(execCtx.batchId, - CommitMetadata(watermarkTracker.currentWatermark, stateStoreCkptId))) { + commitLog.createMetadata( + nextBatchWatermarkMs = watermarkTracker.currentWatermark, + stateUniqueIds = stateStoreCkptId))) { throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala index 1491d26989062..4823810383e4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala @@ -294,7 +294,10 @@ class OfflineStateRepartitionRunner( lastCommittedBatchId: Long, opIdToStateStoreCkptInfo: Option[Map[Long, Array[Array[String]]]]): Unit = { val latestCommit = checkpointMetadata.commitLog.get(lastCommittedBatchId).get - val commitMetadata = latestCommit.copy(stateUniqueIds = opIdToStateStoreCkptInfo) + val commitMetadata = checkpointMetadata.commitLog.createMetadata( + nextBatchWatermarkMs = latestCommit.nextBatchWatermarkMs, + stateUniqueIds = opIdToStateStoreCkptInfo, + commitLogFormatVersion = latestCommit.version) if (!checkpointMetadata.commitLog.add(newBatchId, commitMetadata)) { throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala index bae78f0b4762f..4e9f6cca2ffc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata} +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataV2} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.{col, window} @@ -237,11 +237,11 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB new File(tempDir.getAbsolutePath, "commits").getAbsolutePath) // Start version: treated as v1 (no operator unique ids) - val startMetadata = CommitMetadata(0, None) + val startMetadata = CommitMetadata(0) assert(commitLog.add(0, startMetadata)) // End version: treated as v2 (operator 0 has unique ids) - val endMetadata = CommitMetadata(0, + val endMetadata = CommitMetadataV2(0, Some(Map[Long, Array[Array[String]]](0L -> Array(Array("uid"))))) assert(commitLog.add(1, endMetadata)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala index be7874e806cd8..130dca8c5c2f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala @@ -99,7 +99,10 @@ class StatePartitionAllColumnFamiliesWriterSuite extends StateDataSourceTestBase // Commit to commitLog with checkpoint IDs val latestCommit = targetCheckpointMetadata.commitLog.get(lastBatch).get - val commitMetadata = latestCommit.copy(stateUniqueIds = checkpointInfos) + val commitMetadata = targetCheckpointMetadata.commitLog.createMetadata( + nextBatchWatermarkMs = latestCommit.nextBatchWatermarkMs, + stateUniqueIds = checkpointInfos, + commitLogFormatVersion = latestCommit.version) targetCheckpointMetadata.commitLog.add(writeBatchId, commitMetadata) val versionToCheck = writeBatchId + 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala index aa5826572240f..02c031a3e1e1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, FileInputStream, FileOutputStream} import java.nio.file.Path import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata} +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -63,7 +63,7 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { ) } - private def testSerde(commitMetadata: CommitMetadata, path: Path): Unit = { + private def testSerde(commitMetadata: CommitMetadataBase, path: Path): Unit = { if (regenerateGoldenFiles) { val commitLog = new CommitLog(spark, path.toString) val outputStream = new FileOutputStream(path.resolve("testCommitLog").toFile) @@ -103,19 +103,21 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { 0L -> Array(Array("unique_id1", "unique_id2"), Array("unique_id3", "unique_id4")), 1L -> Array(Array("unique_id5", "unique_id6"), Array("unique_id7", "unique_id8")) ) - val testMetadataV2 = CommitMetadata(0, Some(testStateUniqueIds)) + val testMetadataV2 = CommitMetadataV2(0, Some(testStateUniqueIds)) testSerde(testMetadataV2, testCommitLogV2FilePath) } } test("Basic Commit Log V2 SerDe - empty stateUniqueIds") { withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") { - val testMetadataV2 = CommitMetadata(0, Some(Map[Long, Array[Array[String]]]())) + val testMetadataV2 = CommitMetadataV2(0, Some(Map[Long, Array[Array[String]]]())) testSerde(testMetadataV2, testCommitLogV2FilePathEmptyUniqueId) } } - // Old metadata structure with no state unique ids should not affect the deserialization + // SPARK-50653: When the configured commit log version is V2, a V1 file on disk should still + // deserialize successfully into a V1 [[CommitMetadata]] because the wire format version is now + // discovered from the file header rather than enforced to match the conf. test("Cross-version V1 SerDe") { withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") { val commitlogV1 = """v1 @@ -123,18 +125,11 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { val inputStream: ByteArrayInputStream = new ByteArrayInputStream(commitlogV1.getBytes("UTF-8")) - // TODO [SPARK-50653]: Uncomment the below when v2 -> v1 backward compatibility is added - // val commitMetadata: CommitMetadata = new CommitLog( - // spark, testCommitLogV1FilePath.toString).deserialize(inputStream) - // assert(commitMetadata.nextBatchWatermarkMs === 233) - // assert(commitMetadata.stateUniqueIds === Map.empty) - - // TODO [SPARK-50653]: remove the below when v2 -> v1 backward compatibility is added - val e = intercept[IllegalStateException] { - new CommitLog(spark, testCommitLogV1FilePath.toString).deserialize(inputStream) - } - - assert (e.getMessage.contains("only supported log version")) + val commitMetadata = new CommitLog( + spark, testCommitLogV1FilePath.toString).deserialize(inputStream) + assert(commitMetadata.version === CommitLog.VERSION_1) + assert(commitMetadata.nextBatchWatermarkMs === 233) + assert(commitMetadata.stateUniqueIds.isEmpty) } } } From bb40d7d64addacfcc56bdd7774238764c4189b53 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 20 May 2026 19:55:40 +0000 Subject: [PATCH 5/6] [SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution Add the commit log data structures for streaming sink evolution: - `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`). - `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` (serialized via `OffsetV2.json()`), `providerName`, and an `isActive` flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use. - `CommitMetadataV3.activeSinkMetadataInfoOpt` returns the entry with `isActive = true`, if any. - `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when `commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`. - `CommitLog.readCommitMetadata` dispatches `v3` files to the new class. The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through `MicroBatchExecution` (so each batch persists its sink name + offset, and so restarts read the map back and validate the sink identity) is a follow-up. SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in a separate, narrowly scoped change. No. `CommitMetadataV3` is in the internal `org.apache.spark.sql.execution.streaming.checkpointing` package and is not produced by any code path yet. Added unit tests in `CommitLogSuite`: - V3 SerDe with a single active sink (round-trips through commit log). - V3 retains historical sinks alongside the active one and `activeSinkMetadataInfoOpt` resolves correctly. - `createMetadata(version = V3, sinkMetadataMap = Map.empty)` fails fast with `IllegalArgumentException`. Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Isaac --- .../streaming/checkpointing/CommitLog.scala | 75 ++++++++++++++++++- .../spark/sql/streaming/CommitLogSuite.scala | 63 +++++++++++++++- 2 files changed, 136 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala index 8b32ff9e6208f..ac7993d8fae61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala @@ -26,6 +26,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf @@ -76,12 +77,19 @@ class CommitLog( /** * Factory for creating a [[CommitMetadataBase]] for the requested wire format version. * Defaults to the version configured via [[SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION]]. + * + * For [[VERSION_3]], [[sinkMetadataMap]] must be non-empty. */ def createMetadata( nextBatchWatermarkMs: Long = 0, stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None, + sinkMetadataMap: Map[String, SinkMetadataInfo] = Map.empty, commitLogFormatVersion: Int = VERSION): CommitMetadataBase = { commitLogFormatVersion match { + case VERSION_3 => + require(sinkMetadataMap.nonEmpty, + "VERSION_3 commit log requires a non-empty sinkMetadataMap") + CommitMetadataV3(nextBatchWatermarkMs, stateUniqueIds, sinkMetadataMap) case VERSION_2 => CommitMetadataV2(nextBatchWatermarkMs, stateUniqueIds) case VERSION_1 => @@ -96,7 +104,8 @@ object CommitLog { private val EMPTY_JSON = "{}" val VERSION_1 = 1 val VERSION_2 = 2 - val MAX_VERSION: Int = VERSION_2 + val VERSION_3 = 3 + val MAX_VERSION: Int = VERSION_3 /** * Reads a single commit log entry and dispatches to the matching @@ -110,6 +119,7 @@ object CommitLog { val version = MetadataVersionUtil.validateVersion(lines.next().trim, MAX_VERSION) val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON version match { + case VERSION_3 => CommitMetadataV3(metadataJson) case VERSION_2 => CommitMetadataV2(metadataJson) case VERSION_1 => CommitMetadata(metadataJson) case v => throw QueryExecutionErrors.logVersionGreaterThanSupported(v, MAX_VERSION) @@ -181,3 +191,66 @@ object CommitMetadataV2 { def apply(json: String): CommitMetadataV2 = Serialization.read[CommitMetadataV2](json) } + +/** + * Commit log metadata for [[CommitLog.VERSION_3]]. Extends V2 with a map of per-sink metadata + * keyed by sink name. This enables streaming sink evolution: each batch records the active sink + * along with any historical sinks that were used in earlier batches but are no longer active. + * + * @param nextBatchWatermarkMs The watermark of the next batch. + * @param stateUniqueIds Per-operator state store unique ids (see [[CommitMetadataV2]]). + * @param sinkMetadataMap Map keyed by sink name. There is at most one active entry per + * commit; deactivated sinks are retained to detect reuse of a sink name. + */ +case class CommitMetadataV3( + nextBatchWatermarkMs: Long = 0, + stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None, + sinkMetadataMap: Map[String, SinkMetadataInfo] = Map.empty) extends CommitMetadataBase { + override def version: Int = CommitLog.VERSION_3 + + /** Returns the currently active sink's metadata, if any. */ + def activeSinkMetadataInfoOpt: Option[SinkMetadataInfo] = sinkMetadataMap.values.find(_.isActive) +} + +object CommitMetadataV3 { + implicit val format: Formats = Serialization.formats(NoTypeHints) + + def apply(json: String): CommitMetadataV3 = Serialization.read[CommitMetadataV3](json) +} + +/** + * Per-sink metadata recorded in a [[CommitMetadataV3]] entry. + * + * @param sinkName Sink name as supplied via `DataStreamWriter.name()`, or + * [[org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.DEFAULT_SINK_NAME]] + * when sink evolution is disabled. + * @param commitOffset The latest offset committed to the sink as a JSON string + * (i.e. [[OffsetV2.json()]]), or [[OffsetSeqLog.SERIALIZED_VOID_OFFSET]] if + * no offset is available. + * @param providerName Identifies the sink implementation (e.g. fully-qualified class name). + * @param isActive Whether this sink is the active sink for the current batch. Historical sinks + * are retained with `isActive = false`. + */ +case class SinkMetadataInfo( + sinkName: String, + commitOffset: String, + providerName: String, + isActive: Boolean = true) { + def json: String = Serialization.write(this)(SinkMetadataInfo.format) +} + +object SinkMetadataInfo { + private implicit val format: Formats = Serialization.formats(NoTypeHints) + + def apply( + sinkName: String, + commitOffset: Option[OffsetV2], + providerName: String, + isActive: Boolean): SinkMetadataInfo = { + val offsetString = commitOffset match { + case Some(off) => off.json + case None => OffsetSeqLog.SERIALIZED_VOID_OFFSET + } + new SinkMetadataInfo(sinkName, offsetString, providerName, isActive) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala index 02c031a3e1e1d..c84f7114bd577 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, FileInputStream, FileOutputStream} import java.nio.file.Path import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2, CommitMetadataV3, OffsetSeqLog, SinkMetadataInfo} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -115,6 +115,67 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { } } + test("Basic Commit Log V3 SerDe - single active sink") { + withTempDir { tempDir => + val commitLog = new CommitLog(spark, tempDir.getAbsolutePath) + val sinkInfo = SinkMetadataInfo( + sinkName = "sink-0", + commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET, + providerName = "memory", + isActive = true) + val metadata = commitLog.createMetadata( + nextBatchWatermarkMs = 42, + sinkMetadataMap = Map("sink-0" -> sinkInfo), + commitLogFormatVersion = CommitLog.VERSION_3) + assert(commitLog.add(0, metadata)) + + val read = commitLog.get(0).get + assert(read.version === CommitLog.VERSION_3) + assert(read.nextBatchWatermarkMs === 42) + val readV3 = read.asInstanceOf[CommitMetadataV3] + assert(readV3.sinkMetadataMap === Map("sink-0" -> sinkInfo)) + assert(readV3.activeSinkMetadataInfoOpt === Some(sinkInfo)) + } + } + + test("Commit Log V3 - retains historical sinks alongside active") { + withTempDir { tempDir => + val commitLog = new CommitLog(spark, tempDir.getAbsolutePath) + val historical = SinkMetadataInfo( + sinkName = "sink-0", + commitOffset = """{"offset":3}""", + providerName = "memory", + isActive = false) + val active = SinkMetadataInfo( + sinkName = "sink-1", + commitOffset = """{"offset":7}""", + providerName = "memory", + isActive = true) + val metadata = commitLog.createMetadata( + nextBatchWatermarkMs = 100, + sinkMetadataMap = Map("sink-0" -> historical, "sink-1" -> active), + commitLogFormatVersion = CommitLog.VERSION_3) + assert(commitLog.add(0, metadata)) + + val readV3 = commitLog.get(0).get.asInstanceOf[CommitMetadataV3] + assert(readV3.activeSinkMetadataInfoOpt === Some(active)) + assert(readV3.sinkMetadataMap("sink-0") === historical) + assert(readV3.sinkMetadataMap("sink-1") === active) + } + } + + test("createMetadata for V3 requires non-empty sinkMetadataMap") { + withTempDir { tempDir => + val commitLog = new CommitLog(spark, tempDir.getAbsolutePath) + intercept[IllegalArgumentException] { + commitLog.createMetadata( + nextBatchWatermarkMs = 0, + sinkMetadataMap = Map.empty, + commitLogFormatVersion = CommitLog.VERSION_3) + } + } + } + // SPARK-50653: When the configured commit log version is V2, a V1 file on disk should still // deserialize successfully into a V1 [[CommitMetadata]] because the wire format version is now // discovered from the file header rather than enforced to match the conf. From d76cfb5aba263d7b07adbd0b4c51c303ab4457ed Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 20 May 2026 20:12:45 +0000 Subject: [PATCH 6/6] [SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution ### What changes were proposed in this pull request? Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches: - Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`. - When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`. - When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged. This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred. ### Why are the changes needed? SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable. ### Does this PR introduce _any_ user-facing change? Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off. ### How was this patch tested? Added four new tests in `StreamingSinkEvolutionSuite`: - V3 commit log records the active sink for a named query. - Renaming the sink across a restart retains the previous sink as `isActive = false` and marks the new one active. - With sink evolution disabled, the commit log remains V1/V2. - Enabling sink evolution on a checkpoint that previously used V1/V2 transparently upgrades to V3 on the next commit. Existing `StreamingSinkEvolutionSuite`, `CommitLogSuite`, `MicroBatchExecutionSuite`, and `AsyncProgressTrackingMicroBatchExecutionSuite` all pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Isaac --- .../runtime/MicroBatchExecution.scala | 41 ++++++- .../test/StreamingSinkEvolutionSuite.scala | 114 ++++++++++++++++++ 2 files changed, 152 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 86ec478ff81cf..ee0569958c29c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitLog, CommitMetadataV3, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2, SinkMetadataInfo} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} @@ -128,6 +128,15 @@ class MicroBatchExecution( } } + // Historical sink metadata read from the commit log on restart. Insertion order is preserved so + // that we can re-emit deactivated sinks in the same order they originally appeared. Mutated by + // [[populateStartOffsets]] (reads) and by the commit-log write in [[runBatch]] (updates). + private val sinkMetadataMap = mutable.LinkedHashMap.empty[String, SinkMetadataInfo] + + /** True when the current query should persist V3 sink metadata in the commit log. */ + private def commitLogV3Enabled: Boolean = + sparkSession.sessionState.conf.enableStreamingSinkEvolution + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ protected def getTrigger(): TriggerExecutor = { @@ -740,6 +749,11 @@ class MicroBatchExecution( commitMetadata.stateUniqueIds.foreach { stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds } + commitMetadata match { + case v3: CommitMetadataV3 => + sinkMetadataMap ++= v3.sinkMetadataMap + case _ => + } if (latestBatchId == latestCommittedBatchId) { /* The last batch was successfully committed, so we can safely process a * new next batch but first: @@ -1438,10 +1452,31 @@ class MicroBatchExecution( } else { None } - if (!commitLog.add(execCtx.batchId, + val metadata = if (commitLogV3Enabled) { + val currentSinkInfo = SinkMetadataInfo( + sinkName = sinkName, + commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET, + providerName = sink.getClass.getName, + isActive = true) + // Mark every previously-seen sink as inactive, then overlay the current sink as active. + // The previous entry for [[sinkName]], if any, is overwritten here. + val deactivated = sinkMetadataMap.iterator + .map { case (name, info) => name -> info.copy(isActive = false) } + .toMap + val updatedSinkMap = deactivated + (sinkName -> currentSinkInfo) + sinkMetadataMap.clear() + sinkMetadataMap ++= updatedSinkMap commitLog.createMetadata( nextBatchWatermarkMs = watermarkTracker.currentWatermark, - stateUniqueIds = stateStoreCkptId))) { + stateUniqueIds = stateStoreCkptId, + sinkMetadataMap = updatedSinkMap, + commitLogFormatVersion = CommitLog.VERSION_3) + } else { + commitLog.createMetadata( + nextBatchWatermarkMs = watermarkTracker.currentWatermark, + stateUniqueIds = stateStoreCkptId) + } + if (!commitLog.add(execCtx.batchId, metadata)) { throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala index a242faabaf921..a6cc8db3c833b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterEach, Tag} import org.apache.spark.SparkException import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadataV3} import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamTest @@ -183,6 +184,119 @@ class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach { q2.stop() } + // =========================== + // Commit log V3 persistence + // =========================== + + testWithSinkEvolution("commit log records V3 metadata with named sink") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val q = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val latest = commitLog.getLatest().getOrElse(fail("No commit recorded")) + val v3 = latest._2 match { + case v: CommitMetadataV3 => v + case other => fail(s"Expected CommitMetadataV3, got $other") + } + val active = v3.activeSinkMetadataInfoOpt + .getOrElse(fail("No active sink in commit metadata")) + assert(active.sinkName === "my_sink") + assert(active.isActive) + assert(v3.sinkMetadataMap.size === 1) + } + + testWithSinkEvolution("commit log V3 retains historical sink after rename") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // First batch under sink name "old_sink". + input.addData(1) + val q1 = input.toDF().writeStream + .format("noop") + .name("old_sink") + .option("checkpointLocation", checkpointDir) + .start() + q1.processAllAvailable() + q1.stop() + + // Restart with a new sink name "new_sink" against the same checkpoint. + input.addData(2) + val q2 = input.toDF().writeStream + .format("noop") + .name("new_sink") + .option("checkpointLocation", checkpointDir) + .start() + q2.processAllAvailable() + q2.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3] + assert(v3.sinkMetadataMap.keySet === Set("old_sink", "new_sink")) + assert(v3.activeSinkMetadataInfoOpt.map(_.sinkName) === Some("new_sink")) + assert(v3.sinkMetadataMap("old_sink").isActive === false) + assert(v3.sinkMetadataMap("new_sink").isActive === true) + } + + test("commit log stays V1/V2 when sink evolution is disabled") { + val checkpointDir = newMetadataDir + withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + val input = MemoryStream[Int] + input.addData(1, 2) + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val latest = commitLog.getLatest().get._2 + assert(latest.version === CommitLog.VERSION_1 || latest.version === CommitLog.VERSION_2, + s"Expected V1 or V2 commit log, got v${latest.version}") + assert(!latest.isInstanceOf[CommitMetadataV3]) + } + + testWithSinkEvolution("enabling sink evolution mid-checkpoint upgrades commit log to V3") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // First run with sink evolution disabled writes V1/V2, no sink metadata. + withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + input.addData(1) + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + + // Restart with sink evolution enabled, supplying a name. V3 should now be written; the + // previous V1/V2 batches contribute no historical sinks. + input.addData(2) + val q = input.toDF().writeStream + .format("noop") + .name("upgraded_sink") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3] + assert(v3.activeSinkMetadataInfoOpt.map(_.sinkName) === Some("upgraded_sink")) + assert(v3.sinkMetadataMap.size === 1) + } + // ============== // Helper Methods // ==============