From 0b07a844984b8a496ab6a752b043007ca4cf8eca Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Fri, 21 Nov 2025 23:11:53 +0000 Subject: [PATCH 1/8] enable variant shredding configs by default --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2e9802913d5d..53a2e738b3d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1598,7 +1598,7 @@ object SQLConf { "variant logical type.") .version("4.1.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val PARQUET_IGNORE_VARIANT_ANNOTATION = buildConf("spark.sql.parquet.ignoreVariantAnnotation") @@ -5610,7 +5610,7 @@ object SQLConf { "requested fields.") .version("4.0.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val VARIANT_WRITE_SHREDDING_ENABLED = buildConf("spark.sql.variant.writeShredding.enabled") @@ -5618,7 +5618,7 @@ object SQLConf { .doc("When true, the Parquet writer is allowed to write shredded variant. ") .version("4.0.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST = buildConf("spark.sql.variant.forceShreddingSchemaForTest") @@ -5651,7 +5651,7 @@ object SQLConf { .doc("Infer shredding schema when writing Variant columns in Parquet tables.") .version("4.1.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback") From 3a601f88a2f6a0d40830bdb719000b4906f8b92f Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 25 Nov 2025 04:00:56 +0000 Subject: [PATCH 2/8] resolve unit test failures --- .../org/apache/spark/sql/VariantSuite.scala | 69 +++++++++++-------- .../ParquetVariantShreddingSuite.scala | 15 ++-- .../parquet/VariantInferShreddingSuite.scala | 3 + 3 files changed, 53 insertions(+), 34 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index ac6a4e435709..72a1ed462d16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -197,36 +197,40 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval } test("round trip tests") { - val rand = new Random(42) - val input = Seq.fill(50) { - if (rand.nextInt(10) == 0) { - null - } else { - val value = new Array[Byte](rand.nextInt(50)) - rand.nextBytes(value) - val metadata = new Array[Byte](rand.nextInt(50)) - rand.nextBytes(metadata) - // Generate a valid metadata, otherwise the shredded reader will fail. - new VariantVal(value, Array[Byte](VERSION, 0, 0) ++ metadata) + // Since we are writing random bytes, shredding schema inference will fail. + withSQLConf(SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key -> "false") { + val rand = new Random(42) + val input = Seq.fill(50) { + if (rand.nextInt(10) == 0) { + null + } else { + val value = new Array[Byte](rand.nextInt(50)) + rand.nextBytes(value) + val metadata = new Array[Byte](rand.nextInt(50)) + rand.nextBytes(metadata) + // Generate a valid metadata, otherwise the shredded reader will fail. + new VariantVal(value, Array[Byte](VERSION, 0, 0) ++ metadata) + } } - } - val df = spark.createDataFrame( - spark.sparkContext.parallelize(input.map(Row(_))), - StructType.fromDDL("v variant") - ) - val result = df.collect().map(_.get(0).asInstanceOf[VariantVal]) + val df = spark.createDataFrame( + spark.sparkContext.parallelize(input.map(Row(_))), + StructType.fromDDL("v variant") + ) + val result = df.collect().map(_.get(0).asInstanceOf[VariantVal]) - def prepareAnswer(values: Seq[VariantVal]): Seq[String] = { - values.map(v => if (v == null) "null" else v.debugString()).sorted - } - assert(prepareAnswer(input) == prepareAnswer(result.toImmutableArraySeq)) + def prepareAnswer(values: Seq[VariantVal]): Seq[String] = { + values.map(v => if (v == null) "null" else v.debugString()).sorted + } + assert(prepareAnswer(input) == prepareAnswer(result.toImmutableArraySeq)) - withTempDir { dir => - val tempDir = new File(dir, "files").getCanonicalPath - df.write.parquet(tempDir) - val readResult = spark.read.parquet(tempDir).collect().map(_.get(0).asInstanceOf[VariantVal]) - assert(prepareAnswer(input) == prepareAnswer(readResult.toImmutableArraySeq)) + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + df.write.parquet(tempDir) + val readResult = spark.read.parquet(tempDir).collect() + .map(_.get(0).asInstanceOf[VariantVal]) + assert(prepareAnswer(input) == prepareAnswer(readResult.toImmutableArraySeq)) + } } } @@ -383,14 +387,19 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval ) cases.foreach { case (structDef, condition, parameters) => Seq(false, true).foreach { vectorizedReader => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) { + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString, + // Invalid variant binary fails during shredding schema inference + SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key -> "false" + ) { withTempDir { dir => val file = new File(dir, "dir").getCanonicalPath val df = spark.sql(s"select $structDef as v from range(10)") df.write.parquet(file) val schema = StructType(Seq(StructField("v", VariantType))) val result = spark.read.schema(schema).parquet(file).selectExpr("to_json(v)") - val e = withSQLConf(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> "false") { + val e = withSQLConf(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> "false", + SQLConf.PUSH_VARIANT_INTO_SCAN.key -> "false") { intercept[org.apache.spark.SparkException](result.collect()) } checkError( @@ -812,7 +821,9 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval // The initial size of the buffer backing a cached dataframe column is 128KB. // See `ColumnBuilder`. val numKeys = 128 * 1024 - val keyIterator = (0 until numKeys).iterator + // We start in long range because the shredded writer writes int64 by default which wouldn't + // match narrower binaries. + val keyIterator = (Int.MaxValue + 1L until Int.MaxValue + 1L + numKeys).iterator val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""") val jsonStr = s"{${entries.mkString(", ")}}" val query = s"""select named_struct( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 77140c1a91ee..1cc6d3afbee5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala @@ -48,7 +48,8 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share test("timestamp physical type") { ParquetOutputTimestampType.values.foreach { timestampParquetType => - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> timestampParquetType.toString) { + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> timestampParquetType.toString, + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key -> "true") { withTempDir { dir => val schema = "t timestamp, st struct, at array" val fullSchema = "v struct withSQLConf(SQLConf.PARQUET_ANNOTATE_VARIANT_LOGICAL_TYPE.key -> "true", - SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key -> ignoreVariantAnnotation.toString + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key -> ignoreVariantAnnotation.toString, + SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key -> "false" ) { withTempDir { dir => // write parquet file @@ -302,7 +304,8 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share "c struct>>" withSQLConf(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key -> true.toString, SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> true.toString, - SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema) { + SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema, + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key -> true.toString) { df.write.mode("overwrite").parquet(dir.getAbsolutePath) @@ -376,7 +379,8 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share "struct>>" withSQLConf(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key -> true.toString, SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> true.toString, - SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema) { + SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema, + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key -> true.toString) { df.write.mode("overwrite").parquet(dir.getAbsolutePath) // Verify that we can read the full variant. @@ -441,7 +445,8 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share "m map>" withSQLConf(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key -> true.toString, SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> true.toString, - SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema) { + SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema, + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key -> true.toString) { df.write.mode("overwrite").parquet(dir.getAbsolutePath) // Verify that we can read the full variant. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VariantInferShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VariantInferShreddingSuite.scala index cdaf6c488dc2..49a43fffafb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VariantInferShreddingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/VariantInferShreddingSuite.scala @@ -41,6 +41,9 @@ class VariantInferShreddingSuite extends QueryTest with SharedSparkSession with super.sparkConf.set(SQLConf.PUSH_VARIANT_INTO_SCAN.key, "true") .set(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key, "true") .set(SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key, "true") + // We cannot check the physical shredding schemas if the variant logical type annotation is + // used + .set(SQLConf.PARQUET_ANNOTATE_VARIANT_LOGICAL_TYPE.key, "false") } private def withTempTable(tableNames: String*)(f: => Unit): Unit = { From 02ab583eec5ead8029584166a43f7a5be16d0758 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 25 Nov 2025 06:14:21 +0000 Subject: [PATCH 3/8] fix default resolution for push_variant_into_scan --- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 15 ++++++++++++++- .../datasources/PushVariantIntoScan.scala | 3 ++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 4bef21d0a091..18a4940041fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -588,12 +588,25 @@ object ResolveDefaultColumns extends QueryErrorsBase }.toSeq } + // In order to resolve defaults for variants, they need to be in their regular VariantType + // representation instead of the Struct representation used in shredded reads. + // Note that currently, only null values are supported for variant projections pushed into the + // scan. + private def variantStructToVariant(dt: StructType): StructType = { + def isVariantStruct(s: StructType): Boolean = + s.fields.length > 0 && s.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) + require(!isVariantStruct(dt), "The top level schema should not be a variant struct.") + dt.transformRecursively { + case s: StructType if isVariantStruct(s) => VariantType + }.asInstanceOf[StructType] + } + /** * These define existence default values for the struct fields for efficiency purposes. * The caller should avoid using such methods in a loop for efficiency. */ def existenceDefaultValues(schema: StructType): Array[Any] = - getExistenceDefaultValues(schema) + getExistenceDefaultValues(variantStructToVariant(schema)) def existenceDefaultsBitmask(schema: StructType): Array[Boolean] = getExistenceDefaultsBitmask(schema) def hasExistenceDefaultValues(schema: StructType): Boolean = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala index 2cf1a5e9b8cd..1208c5404def 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala @@ -300,7 +300,8 @@ object PushVariantIntoScan extends Rule[LogicalPlan] { val schemaAttributes = relation.resolve(hadoopFsRelation.dataSchema, hadoopFsRelation.sparkSession.sessionState.analyzer.resolver) val defaultValues = ResolveDefaultColumns.existenceDefaultValues(StructType( - schemaAttributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))) + schemaAttributes.map(a => + StructField(a.name, a.dataType, a.nullable, a.metadata)))) for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) { variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil) } From b65b85db9072897a8335dac51f12097b76be3165 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 25 Nov 2025 06:16:50 +0000 Subject: [PATCH 4/8] fix --- .../spark/sql/execution/datasources/PushVariantIntoScan.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala index 1208c5404def..2cf1a5e9b8cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala @@ -300,8 +300,7 @@ object PushVariantIntoScan extends Rule[LogicalPlan] { val schemaAttributes = relation.resolve(hadoopFsRelation.dataSchema, hadoopFsRelation.sparkSession.sessionState.analyzer.resolver) val defaultValues = ResolveDefaultColumns.existenceDefaultValues(StructType( - schemaAttributes.map(a => - StructField(a.name, a.dataType, a.nullable, a.metadata)))) + schemaAttributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))) for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) { variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil) } From 29a87b4b8a1c79a976ac4e650021bfd9df209790 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 25 Nov 2025 07:36:37 +0000 Subject: [PATCH 5/8] fix --- .../spark/sql/catalyst/expressions/Cast.scala | 26 +++++++++++++++++++ .../util/ResolveDefaultColumnsUtil.scala | 18 +++---------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 1f2805ec2789..a3ee47da759e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -360,6 +360,32 @@ object Cast extends QueryErrorsBase { */ def canUpCast(from: DataType, to: DataType): Boolean = UpCastRule.canUpCast(from, to) + /** + * Returns true iff it is safe to provide a default value of `from` type typically defined in the + * data source metadata to the `to` type typically in the read schema of a query. + */ + def canAssignDefaultValue(from: DataType, to: DataType): Boolean = { + def isVariantStruct(dt: DataType): Boolean = { + dt match { + case s: StructType => + s.fields.length > 0 && s.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) + case _ => false + } + } + // In order to resolve defaults for variants, they need to be in their regular VariantType + // representation instead of the Struct representation used in shredded reads. + // Note that currently, only null values are supported for variant projections pushed into the + // scan. + if (canUpCast(from, to)) { + true + } else { + from match { + case VariantType if isVariantStruct(to) => true + case _ => false + } + } + } + /** * Returns true iff we can cast the `from` type to `to` type as per the ANSI SQL. * In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 18a4940041fa..c155fda5e73d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -480,7 +480,8 @@ object ResolveDefaultColumns extends QueryErrorsBase val ret = analyzed match { case equivalent if equivalent.dataType == supplanted => equivalent - case canUpCast if Cast.canUpCast(canUpCast.dataType, supplanted) => + case canAssignDefaultValue + if Cast.canAssignDefaultValue(canAssignDefaultValue.dataType, supplanted) => Cast(analyzed, supplanted, Some(conf.sessionLocalTimeZone)) case other => defaultValueFromWiderTypeLiteral(other, supplanted, colName).getOrElse( @@ -588,25 +589,12 @@ object ResolveDefaultColumns extends QueryErrorsBase }.toSeq } - // In order to resolve defaults for variants, they need to be in their regular VariantType - // representation instead of the Struct representation used in shredded reads. - // Note that currently, only null values are supported for variant projections pushed into the - // scan. - private def variantStructToVariant(dt: StructType): StructType = { - def isVariantStruct(s: StructType): Boolean = - s.fields.length > 0 && s.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) - require(!isVariantStruct(dt), "The top level schema should not be a variant struct.") - dt.transformRecursively { - case s: StructType if isVariantStruct(s) => VariantType - }.asInstanceOf[StructType] - } - /** * These define existence default values for the struct fields for efficiency purposes. * The caller should avoid using such methods in a loop for efficiency. */ def existenceDefaultValues(schema: StructType): Array[Any] = - getExistenceDefaultValues(variantStructToVariant(schema)) + getExistenceDefaultValues(schema) def existenceDefaultsBitmask(schema: StructType): Array[Boolean] = getExistenceDefaultsBitmask(schema) def hasExistenceDefaultValues(schema: StructType): Boolean = From b4332b71aeed7e022ac5f20cffeee941b3d8a4ad Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 25 Nov 2025 09:27:12 +0000 Subject: [PATCH 6/8] fix --- .../spark/sql/catalyst/expressions/Cast.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index a3ee47da759e..04aab0f17458 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -372,6 +372,19 @@ object Cast extends QueryErrorsBase { case _ => false } } + (from, to) match { + case (s1: StructType, s2: StructType) => + s1.length == s2.length && s1.fields.zip(s2.fields).forall { + case (f1, f2) => canAssignDefaultValue(f1.dataType, f2.dataType) + } + case (a1: ArrayType, a2: ArrayType) => + canAssignDefaultValue(a1.elementType, a2.elementType) + case (m1: MapType, m2: MapType) => + canAssignDefaultValue(m1.keyType, m2.keyType) && + canAssignDefaultValue(m1.valueType, m2.valueType) + case (_: VariantType, s: StructType) => isVariantStruct(s) + case (k, v) => canUpCast(k, v) + } // In order to resolve defaults for variants, they need to be in their regular VariantType // representation instead of the Struct representation used in shredded reads. // Note that currently, only null values are supported for variant projections pushed into the From 34dfe8e22f6119fd51e199369d169c216e2a8e90 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Wed, 26 Nov 2025 23:43:24 +0000 Subject: [PATCH 7/8] fix --- .../src/test/scala/org/apache/spark/sql/VariantSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index 72a1ed462d16..6510a165e258 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -197,7 +197,6 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval } test("round trip tests") { - // Since we are writing random bytes, shredding schema inference will fail. withSQLConf(SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key -> "false") { val rand = new Random(42) val input = Seq.fill(50) { @@ -389,7 +388,7 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval Seq(false, true).foreach { vectorizedReader => withSQLConf( SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString, - // Invalid variant binary fails during shredding schema inference + // Invalid variant binary fails during shredding schema inference. SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key -> "false" ) { withTempDir { dir => From 0e12ecdb013e7c95a1bfcf7413082150cc9609ce Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 28 Nov 2025 12:24:52 +0800 Subject: [PATCH 8/8] Apply suggestions from code review --- .../src/test/scala/org/apache/spark/sql/VariantSuite.scala | 4 +--- .../datasources/parquet/ParquetVariantShreddingSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index 6510a165e258..16be9558409c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -820,9 +820,7 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval // The initial size of the buffer backing a cached dataframe column is 128KB. // See `ColumnBuilder`. val numKeys = 128 * 1024 - // We start in long range because the shredded writer writes int64 by default which wouldn't - // match narrower binaries. - val keyIterator = (Int.MaxValue + 1L until Int.MaxValue + 1L + numKeys).iterator + val keyIterator = (0 until numKeys).iterator val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""") val jsonStr = s"{${entries.mkString(", ")}}" val query = s"""select named_struct( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 1cc6d3afbee5..1f06ddb29bd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala @@ -379,8 +379,7 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share "struct>>" withSQLConf(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key -> true.toString, SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> true.toString, - SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema, - SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key -> true.toString) { + SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema) { df.write.mode("overwrite").parquet(dir.getAbsolutePath) // Verify that we can read the full variant.