diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala index 0c8cabb75ed65..80384f8cb3b93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala @@ -41,20 +41,34 @@ class StateSchemaCompatibilityChecker( fm.mkdirs(schemaFileLocation.getParent) def check(keySchema: StructType, valueSchema: StructType): Unit = { + check(keySchema, valueSchema, ignoreValueSchema = false) + } + + def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: Boolean): Unit = { if (fm.exists(schemaFileLocation)) { logDebug(s"Schema file for provider $providerId exists. Comparing with provided schema.") val (storedKeySchema, storedValueSchema) = readSchemaFile() - if (storedKeySchema.equals(keySchema) && storedValueSchema.equals(valueSchema)) { + if (storedKeySchema.equals(keySchema) && + (ignoreValueSchema || storedValueSchema.equals(valueSchema))) { // schema is exactly same } else if (!schemasCompatible(storedKeySchema, keySchema) || - !schemasCompatible(storedValueSchema, valueSchema)) { + (!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema))) { + val errorMsgForKeySchema = s"- Provided key schema: $keySchema\n" + + s"- Existing key schema: $storedKeySchema\n" + + // If it is requested to skip checking the value schema, we also don't expose the value + // schema information to the error message. + val errorMsgForValueSchema = if (!ignoreValueSchema) { + s"- Provided value schema: $valueSchema\n" + + s"- Existing value schema: $storedValueSchema\n" + } else { + "" + } val errorMsg = "Provided schema doesn't match to the schema for existing state! " + "Please note that Spark allow difference of field name: check count of fields " + "and data type of each field.\n" + - s"- Provided key schema: $keySchema\n" + - s"- Provided value schema: $valueSchema\n" + - s"- Existing key schema: $storedKeySchema\n" + - s"- Existing value schema: $storedValueSchema\n" + + errorMsgForKeySchema + + errorMsgForValueSchema + s"If you want to force running query without schema validation, please set " + s"${SQLConf.STATE_SCHEMA_CHECK_ENABLED.key} to false.\n" + "Please note running query with incompatible schema could cause indeterministic" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 5020638abc425..5d65c8e9f20b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -511,7 +511,12 @@ object StateStore extends Logging { val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf) // regardless of configuration, we check compatibility to at least write schema file // if necessary - val ret = Try(checker.check(keySchema, valueSchema)).toEither.fold(Some(_), _ => None) + // if the format validation for value schema is disabled, we also disable the schema + // compatibility checker for value schema as well. + val ret = Try( + checker.check(keySchema, valueSchema, + ignoreValueSchema = !storeConf.formatValidationCheckValue) + ).toEither.fold(Some(_), _ => None) if (storeConf.stateSchemaCheckEnabled) { ret } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 529db2609cd45..66bb37d7a57bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -48,7 +48,12 @@ class StateStoreConf( /** Whether validate the underlying format or not. */ val formatValidationEnabled: Boolean = sqlConf.stateStoreFormatValidationEnabled - /** Whether validate the value format when the format invalidation enabled. */ + /** + * Whether to validate the value side. This config is applied to both validators as below: + * + * - whether to validate the value format when the format validation is enabled. + * - whether to validate the value schema when the state schema check is enabled. + */ val formatValidationCheckValue: Boolean = extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, "true") == "true" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index bcfdeb4f85cdf..11a9776fa9b9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -750,13 +750,15 @@ case class StreamingDeduplicateExec( keyExpressions, getStateInfo, conf) :: Nil } + private val schemaForEmptyRow: StructType = StructType(Array(StructField("__dummy__", NullType))) + override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver child.execute().mapPartitionsWithStateStore( getStateInfo, keyExpressions.toStructType, - child.output.toStructType, + schemaForEmptyRow, numColsPrefixKey = 0, session.sessionState, Some(session.streams.stateStoreCoordinator), diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc new file mode 100644 index 0000000000000..1aee7033161ec Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc new file mode 100644 index 0000000000000..1aee7033161ec Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0 new file mode 100644 index 0000000000000..9c1e3021c3ead --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1 new file mode 100644 index 0000000000000..9c1e3021c3ead --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata new file mode 100644 index 0000000000000..78bd74a789fcc --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata @@ -0,0 +1 @@ +{"id":"33e8de33-00b8-4b60-8246-df2f433257ff"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc new file mode 100644 index 0000000000000..726c678bc6a29 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc new file mode 100644 index 0000000000000..790f681f1aa6a Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0 new file mode 100644 index 0000000000000..443c682435801 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1656644489789,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1 new file mode 100644 index 0000000000000..67b4217556378 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1656644492462,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}} +1 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc new file mode 100644 index 0000000000000..1992982c58ff2 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta new file mode 100644 index 0000000000000..fec40e83a5471 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc new file mode 100644 index 0000000000000..022717c6b5016 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema new file mode 100644 index 0000000000000..f132f9601b73a Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc new file mode 100644 index 0000000000000..d18b77b93aff2 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta new file mode 100644 index 0000000000000..fcbf8df80f5f9 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc new file mode 100644 index 0000000000000..cf1d68e2acee3 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala index 1539341359337..7ba18a8140443 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala @@ -231,6 +231,16 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema)) } + test("SPARK-39650: ignore value schema on compatibility check") { + val typeChangedValueSchema = StructType(valueSchema.map(_.copy(dataType = TimestampType))) + verifySuccess(keySchema, valueSchema, keySchema, typeChangedValueSchema, + ignoreValueSchema = true) + + val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = TimestampType))) + verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema, + ignoreValueSchema = true) + } + private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = { applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3") } @@ -257,44 +267,57 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { dir: String, queryId: UUID, newKeySchema: StructType, - newValueSchema: StructType): Unit = { + newValueSchema: StructType, + ignoreValueSchema: Boolean): Unit = { // in fact, Spark doesn't support online state schema change, so need to check // schema only once for each running of JVM val providerId = StateStoreProviderId( StateStoreId(dir, opId, partitionId), queryId) new StateSchemaCompatibilityChecker(providerId, hadoopConf) - .check(newKeySchema, newValueSchema) + .check(newKeySchema, newValueSchema, ignoreValueSchema = ignoreValueSchema) } private def verifyException( oldKeySchema: StructType, oldValueSchema: StructType, newKeySchema: StructType, - newValueSchema: StructType): Unit = { + newValueSchema: StructType, + ignoreValueSchema: Boolean = false): Unit = { val dir = newDir() val queryId = UUID.randomUUID() - runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema) + runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema, + ignoreValueSchema = ignoreValueSchema) val e = intercept[StateSchemaNotCompatible] { - runSchemaChecker(dir, queryId, newKeySchema, newValueSchema) + runSchemaChecker(dir, queryId, newKeySchema, newValueSchema, + ignoreValueSchema = ignoreValueSchema) } - e.getMessage.contains("Provided schema doesn't match to the schema for existing state!") - e.getMessage.contains(newKeySchema.json) - e.getMessage.contains(newValueSchema.json) - e.getMessage.contains(oldKeySchema.json) - e.getMessage.contains(oldValueSchema.json) + assert(e.getMessage.contains("Provided schema doesn't match to the schema for existing state!")) + assert(e.getMessage.contains(newKeySchema.toString())) + assert(e.getMessage.contains(oldKeySchema.toString())) + + if (ignoreValueSchema) { + assert(!e.getMessage.contains(newValueSchema.toString())) + assert(!e.getMessage.contains(oldValueSchema.toString())) + } else { + assert(e.getMessage.contains(newValueSchema.toString())) + assert(e.getMessage.contains(oldValueSchema.toString())) + } } private def verifySuccess( oldKeySchema: StructType, oldValueSchema: StructType, newKeySchema: StructType, - newValueSchema: StructType): Unit = { + newValueSchema: StructType, + ignoreValueSchema: Boolean = false): Unit = { val dir = newDir() val queryId = UUID.randomUUID() - runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema) - runSchemaChecker(dir, queryId, newKeySchema, newValueSchema) + runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema, + ignoreValueSchema = ignoreValueSchema) + runSchemaChecker(dir, queryId, newKeySchema, newValueSchema, + ignoreValueSchema = ignoreValueSchema) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index aa03da6c5843f..39e46a12d659c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -17,11 +17,16 @@ package org.apache.spark.sql.streaming +import java.io.File + +import org.apache.commons.io.FileUtils + import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils class StreamingDeduplicationSuite extends StateStoreMetricsTest { @@ -413,4 +418,69 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1) ) } + + test("SPARK-39650: duplicate with specific keys should allow input to change schema") { + withTempDir { checkpoint => + val dedupeInputData = MemoryStream[(String, Int)] + val dedupe = dedupeInputData.toDS().dropDuplicates("_1") + + testStream(dedupe, Append)( + StartStream(checkpointLocation = checkpoint.getCanonicalPath), + + AddData(dedupeInputData, "a" -> 1), + CheckLastBatch("a" -> 1), + + AddData(dedupeInputData, "a" -> 2, "b" -> 3), + CheckLastBatch("b" -> 3) + ) + + val dedupeInputData2 = MemoryStream[(String, Int, String)] + val dedupe2 = dedupeInputData2.toDS().dropDuplicates("_1") + + // initialize new memory stream with previously executed batches + dedupeInputData2.addData(("a", 1, "dummy")) + dedupeInputData2.addData(Seq(("a", 2, "dummy"), ("b", 3, "dummy"))) + + testStream(dedupe2, Append)( + StartStream(checkpointLocation = checkpoint.getCanonicalPath), + + AddData(dedupeInputData2, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")), + CheckLastBatch(("c", 9, "c")) + ) + } + } + + test("SPARK-39650: recovery from checkpoint having all columns as value schema") { + // NOTE: We are also changing the schema of input compared to the checkpoint. In the checkpoint + // we define the input schema as (String, Int). + val inputData = MemoryStream[(String, Int, String)] + val dedupe = inputData.toDS().dropDuplicates("_1") + + // The fix will land after Spark 3.3.0, hence we can check backward compatibility with + // checkpoint being built from Spark 3.3.0. + val resourceUri = this.getClass.getResource( + "/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/").toURI + + val checkpointDir = Utils.createTempDir().getCanonicalFile + // Copy the checkpoint to a temp dir to prevent changes to the original. + // Not doing this will lead to the test passing on the first run, but fail subsequent runs. + FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + + inputData.addData(("a", 1, "dummy")) + inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy")) + + testStream(dedupe, Append)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + /* + Note: The checkpoint was generated using the following input in Spark version 3.3.0 + AddData(inputData, ("a", 1)), + CheckLastBatch(("a", 1)), + AddData(inputData, ("a", 2), ("b", 3)), + CheckLastBatch(("b", 3)) + */ + + AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")), + CheckLastBatch(("c", 9, "c")) + ) + } }