diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala index 4f6c8e5121731..7653bb0ede79a 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala @@ -27,9 +27,10 @@ import org.apache.avro.{AvroRuntimeException, Schema} import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.HoodieSparkSchemaConverters import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.{Dataset, Row, SparkSession} +import java.util.Locale import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -145,6 +146,82 @@ object HoodieSchemaConversionUtils { } } + /** + * Re-attach catalog metadata/nullability that Spark's write-path rewrites strip: + * + * 1. VECTOR/BLOB logical-type metadata - dropped by TableOutputResolver's Cast and + * UPDATE's castIfNeeded; without it, conversion falls back to the physical type + * (ARRAY/STRUCT) and schema compatibility fails. + * 2. Nullability (only when `alignNullability = true`) - VALUES literals and Cast + * outputs are nullable by default, which conflicts with non-null catalog columns + * (e.g. primaryKey) once this schema becomes the writer's canonical schema. + * + * `alignNullability = true` is only safe when upstream null-assertion expressions + * guarantee no nulls flow into non-null columns (INSERT/UPDATE via TableOutputResolver/ + * castIfNeeded). MERGE must pass `false`: assignments run later inside ExpressionPayload, + * so raw source rows may legitimately carry nulls. + * + * BLOB is projected nullable-everywhere by [[HoodieSparkSchemaConverters.toSqlType]], so + * the nullability branch is a no-op inside BLOB subtrees; RFC-100 non-null invariants are + * enforced at the physical-schema boundary in HoodieSchema.Blob#createBlob. + * + * Recurses into nested structs, struct-element arrays, and struct-value maps. Source-only + * fields (e.g. MERGE join keys) are returned unchanged. + */ + def alignSchemaWithCatalog(sourceSchema: StructType, + targetSchema: StructType, + caseSensitive: Boolean, + alignNullability: Boolean): StructType = { + val lookupKey: String => String = + if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT) + val targetByName: Map[String, StructField] = + targetSchema.fields.map(f => lookupKey(f.name) -> f).toMap + + StructType(sourceSchema.fields.map { field => + targetByName.get(lookupKey(field.name)) match { + case Some(target) => alignField(field, target, caseSensitive, alignNullability) + case None => field + } + }) + } + + private def alignField(source: StructField, + target: StructField, + caseSensitive: Boolean, + alignNullability: Boolean): StructField = { + val alignedNullable = if (alignNullability) target.nullable else source.nullable + val alignedField = (source.dataType, target.dataType) match { + case (s: StructType, t: StructType) => + source.copy( + dataType = alignSchemaWithCatalog(s, t, caseSensitive, alignNullability), + nullable = alignedNullable) + case (ArrayType(sElem: StructType, sContainsNull), ArrayType(tElem: StructType, tContainsNull)) => + val alignedContainsNull = if (alignNullability) tContainsNull else sContainsNull + source.copy( + dataType = ArrayType(alignSchemaWithCatalog(sElem, tElem, caseSensitive, alignNullability), alignedContainsNull), + nullable = alignedNullable) + case (MapType(sKey, sVal: StructType, sValueContainsNull), MapType(_, tVal: StructType, tValueContainsNull)) => + val alignedValueContainsNull = if (alignNullability) tValueContainsNull else sValueContainsNull + source.copy( + dataType = MapType(sKey, alignSchemaWithCatalog(sVal, tVal, caseSensitive, alignNullability), alignedValueContainsNull), + nullable = alignedNullable) + case _ => + source.copy(nullable = alignedNullable) + } + + if (target.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) { + val enrichedMetadata = new MetadataBuilder() + .withMetadata(alignedField.metadata) + .putString( + HoodieSchema.TYPE_METADATA_FIELD, + target.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + .build() + alignedField.copy(metadata = enrichedMetadata) + } else { + alignedField + } + } + /** * Recursively aligns the nullable property of Spark schema fields with HoodieSchema. * diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala index 80105e32013bb..42ff54a7155e8 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala @@ -24,9 +24,12 @@ import org.apache.hudi.common.schema.HoodieSchema.TimePrecision import org.apache.hudi.internal.schema.HoodieSchemaException import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.Decimal.minBytesForPrecision +import java.util.Locale + import scala.collection.JavaConverters._ /** @@ -257,6 +260,7 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { SchemaType(ArrayType(sparkElementType, containsNull = false), nullable = false, Some(metadata)) case HoodieSchemaType.BLOB | HoodieSchemaType.RECORD => + val isBlob = hoodieSchema.getType == HoodieSchemaType.BLOB val fullName = hoodieSchema.getFullName if (existingRecordNames.contains(fullName)) { throw new IncompatibleSchemaException( @@ -278,10 +282,22 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { metadataBuilder.putString(HoodieSchema.TYPE_METADATA_FIELD, HoodieSchema.Blob.TYPE_DESCRIPTOR) } val metadata = metadataBuilder.build() - StructField(f.name(), schemaType.dataType, schemaType.nullable, metadata) + // For BLOB: force nullable-everywhere at the Spark type layer. The RFC-100 + // canonical schema declares `type`, `reference.external_path`, and + // `reference.managed` as strictly non-null, but that contract is conditional + // ("required when parent is present") and Spark's type system can't model it. + // Projecting BLOB as nullable-everywhere for Spark avoids downstream pain + // (Cast / TableOutputResolver / Cast.canCast rewrites); the on-disk physical + // schema stays RFC-100 compliant because the write path goes through + // HoodieSchema.Blob.createBlob(), which uses the canonical fields verbatim. + if (isBlob) { + StructField(f.name(), withAllFieldsNullable(schemaType.dataType), nullable = true, metadata) + } else { + StructField(f.name(), schemaType.dataType, schemaType.nullable, metadata) + } } // For BLOB types, propagate type metadata via SchemaType - val schemaTypeMetadata = if (hoodieSchema.getType == HoodieSchemaType.BLOB) { + val schemaTypeMetadata = if (isBlob) { Some(new MetadataBuilder() .putString(HoodieSchema.TYPE_METADATA_FIELD, hoodieSchema.asInstanceOf[HoodieSchema.Blob].toTypeDescriptor) .build()) @@ -350,11 +366,17 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { /** * Validates that a StructType matches the expected blob schema structure defined in {@link HoodieSchema.Blob}. * + * Purely structural: compares field names and data types recursively, ignoring nullability. + * At the Spark type layer, BLOB is projected as nullable-everywhere by [[toSqlType]] (see the + * comment on the BLOB case there); nullability is therefore not part of the structural + * contract. The RFC-100 non-null invariants are enforced at the physical-schema write + * boundary by {@link HoodieSchema.Blob#createBlob}. + * * @param structType the StructType to validate * @throws IllegalArgumentException if the structure does not match the expected blob schema */ private def validateBlobStructure(structType: StructType): Unit = { - if (!structType.equals(expectedBlobStructType)) { + if (!matchesStructure(structType, expectedBlobStructType, SQLConf.get.caseSensitiveAnalysis)) { throw new IllegalArgumentException( s"""Invalid blob schema structure. Expected schema: |${expectedBlobStructType.toDDL} @@ -363,6 +385,31 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { } } + private def matchesStructure(source: DataType, expected: DataType, caseSensitive: Boolean): Boolean = + (source, expected) match { + case (s: StructType, e: StructType) => + s.length == e.length && s.fields.zip(e.fields).forall { case (sf, ef) => + nameEquals(sf.name, ef.name, caseSensitive) && + matchesStructure(sf.dataType, ef.dataType, caseSensitive) + } + case _ => source == expected + } + + private def nameEquals(a: String, b: String, caseSensitive: Boolean): Boolean = + if (caseSensitive) a == b else a.equalsIgnoreCase(b) + + private def withAllFieldsNullable(dataType: DataType): DataType = dataType match { + case s: StructType => + StructType(s.fields.map(f => f.copy( + dataType = withAllFieldsNullable(f.dataType), + nullable = true))) + case ArrayType(elementType, _) => + ArrayType(withAllFieldsNullable(elementType), containsNull = true) + case MapType(keyType, valueType, _) => + MapType(keyType, withAllFieldsNullable(valueType), valueContainsNull = true) + case other => other + } + private lazy val expectedVariantStructType: StructType = { val metadataField = StructField(HoodieSchema.Variant.VARIANT_METADATA_FIELD, BinaryType, nullable = false) val valueField = StructField(HoodieSchema.Variant.VARIANT_VALUE_FIELD, BinaryType, nullable = false) @@ -373,14 +420,25 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { * Validates that a StructType matches the expected unshredded variant schema * (two non-null {@code BinaryType} fields: {@code metadata} and {@code value}). * + * Note on nullability: unlike BLOB, VARIANT is not projected nullable-everywhere at the + * Spark type layer because the user-facing path is gated. Spark 3.x rejects VARIANT at + * schema resolution, and Spark 4.0+ exposes it as the native {@code VariantType} populated + * via {@code parse_json(...)}, never a user-supplied {@code named_struct}. The internal + * physical layout ({@code struct} with non-null fields) only appears + * through {@link HoodieSparkSchemaConverters#toSqlType}, which produces the canonical + * non-null shape this validator expects. + * * @param structType the StructType to validate * @throws IllegalArgumentException if the structure does not match the expected variant schema */ private def validateVariantStructure(structType: StructType): Unit = { - val fieldsByName = structType.fields.map(f => f.name -> f).toMap + val caseSensitive = SQLConf.get.caseSensitiveAnalysis + val key: String => String = + if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT) + val fieldsByName = structType.fields.map(f => key(f.name) -> f).toMap val ok = structType.length == 2 && - fieldsByName.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD).exists(f => f.dataType == BinaryType && !f.nullable) && - fieldsByName.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD).exists(f => f.dataType == BinaryType && !f.nullable) + fieldsByName.get(key(HoodieSchema.Variant.VARIANT_METADATA_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable) && + fieldsByName.get(key(HoodieSchema.Variant.VARIANT_VALUE_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable) if (!ok) { throw new IllegalArgumentException( s"""Invalid variant schema structure. Expected schema: diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala index 389b8ad03a591..1a52f1daf20c5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala @@ -839,6 +839,143 @@ class TestHoodieSchemaConversionUtils extends FunSuite with Matchers { assert(parsedVector.getVectorElementType == HoodieSchema.Vector.VectorElementType.FLOAT) } + test("alignSchemaWithCatalog narrows nullability when alignNullability = true") { + val sourceSchema = new StructType() + .add("id", LongType, nullable = true) + .add("payload", new StructType() + .add("inner", StringType, nullable = true), nullable = true) + .add("items", ArrayType(new StructType() + .add("x", IntegerType, nullable = true), containsNull = true), nullable = true) + .add("lookup", MapType(StringType, new StructType() + .add("y", IntegerType, nullable = true), valueContainsNull = true), nullable = true) + + val targetSchema = new StructType() + .add("id", LongType, nullable = false) + .add("payload", new StructType() + .add("inner", StringType, nullable = false), nullable = false) + .add("items", ArrayType(new StructType() + .add("x", IntegerType, nullable = false), containsNull = false), nullable = false) + .add("lookup", MapType(StringType, new StructType() + .add("y", IntegerType, nullable = false), valueContainsNull = false), nullable = false) + + val aligned = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + sourceSchema, targetSchema, caseSensitive = false, alignNullability = true) + + assert(!aligned("id").nullable) + assert(!aligned("payload").nullable) + assert(!aligned("payload").dataType.asInstanceOf[StructType]("inner").nullable) + val itemsArray = aligned("items").dataType.asInstanceOf[ArrayType] + assert(!aligned("items").nullable && !itemsArray.containsNull) + assert(!itemsArray.elementType.asInstanceOf[StructType]("x").nullable) + val lookupMap = aligned("lookup").dataType.asInstanceOf[MapType] + assert(!aligned("lookup").nullable && !lookupMap.valueContainsNull) + assert(!lookupMap.valueType.asInstanceOf[StructType]("y").nullable) + } + + test("alignSchemaWithCatalog preserves source nullability when alignNullability = false") { + val sourceSchema = new StructType() + .add("id", LongType, nullable = true) + .add("payload", new StructType() + .add("inner", StringType, nullable = true), nullable = true) + .add("items", ArrayType(new StructType() + .add("x", IntegerType, nullable = true), containsNull = true), nullable = true) + .add("lookup", MapType(StringType, new StructType() + .add("y", IntegerType, nullable = true), valueContainsNull = true), nullable = true) + + val targetSchema = new StructType() + .add("id", LongType, nullable = false) + .add("payload", new StructType() + .add("inner", StringType, nullable = false), nullable = false) + .add("items", ArrayType(new StructType() + .add("x", IntegerType, nullable = false), containsNull = false), nullable = false) + .add("lookup", MapType(StringType, new StructType() + .add("y", IntegerType, nullable = false), valueContainsNull = false), nullable = false) + + val aligned = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + sourceSchema, targetSchema, caseSensitive = false, alignNullability = false) + + assert(aligned("id").nullable) + assert(aligned("payload").nullable) + assert(aligned("payload").dataType.asInstanceOf[StructType]("inner").nullable) + val itemsArray = aligned("items").dataType.asInstanceOf[ArrayType] + assert(aligned("items").nullable && itemsArray.containsNull) + assert(itemsArray.elementType.asInstanceOf[StructType]("x").nullable) + val lookupMap = aligned("lookup").dataType.asInstanceOf[MapType] + assert(aligned("lookup").nullable && lookupMap.valueContainsNull) + assert(lookupMap.valueType.asInstanceOf[StructType]("y").nullable) + } + + test("alignSchemaWithCatalog reattaches custom-type metadata regardless of alignNullability") { + val vectorMetadata = new MetadataBuilder() + .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4, FLOAT)") + .build() + val sourceSchema = new StructType() + .add("embedding", ArrayType(FloatType, containsNull = false), nullable = true) + val targetSchema = new StructType() + .add("embedding", ArrayType(FloatType, containsNull = false), nullable = false, vectorMetadata) + + Seq(true, false).foreach { flag => + val aligned = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + sourceSchema, targetSchema, caseSensitive = false, alignNullability = flag) + assert(aligned("embedding").metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"metadata missing when alignNullability=$flag") + assertEquals("VECTOR(4, FLOAT)", + aligned("embedding").metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + } + } + + test("BLOB structure validator accepts mixed-case field names under case-insensitive analysis") { + // Matches RFC-100 BLOB layout (type, data, reference) but with mixed-case field names. + // Under Spark's default case-insensitive analysis, this should validate successfully. + val referenceStruct = new StructType() + .add("External_Path", StringType, nullable = true) + .add("OFFSET", LongType, nullable = true) + .add("Length", LongType, nullable = true) + .add("managed", BooleanType, nullable = true) + val mixedCaseBlob = new StructType() + .add("TYPE", StringType, nullable = true) + .add("Data", BinaryType, nullable = true) + .add("REFERENCE", referenceStruct, nullable = true) + val blobMetadata = new MetadataBuilder() + .putString(HoodieSchema.TYPE_METADATA_FIELD, HoodieSchema.Blob.TYPE_DESCRIPTOR) + .build() + val outerStruct = new StructType() + .add("id", LongType, nullable = false) + .add("payload", mixedCaseBlob, nullable = true, blobMetadata) + + val hoodieSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema( + outerStruct, "MixedCaseBlobTest", "test") + val payload = hoodieSchema.getField("payload").get().schema().getNonNullType + assertEquals(HoodieSchemaType.BLOB, payload.getType) + } + + test("BLOB structure validator rejects wrong field ordering even under case-insensitive analysis") { + // Correct field names but reversed ordering - matchesStructure is positional and must reject. + val referenceStruct = new StructType() + .add("external_path", StringType, nullable = true) + .add("offset", LongType, nullable = true) + .add("length", LongType, nullable = true) + .add("managed", BooleanType, nullable = true) + val wrongOrderBlob = new StructType() + .add("reference", referenceStruct, nullable = true) + .add("data", BinaryType, nullable = true) + .add("type", StringType, nullable = true) + val blobMetadata = new MetadataBuilder() + .putString(HoodieSchema.TYPE_METADATA_FIELD, HoodieSchema.Blob.TYPE_DESCRIPTOR) + .build() + val outerStruct = new StructType() + .add("id", LongType, nullable = false) + .add("payload", wrongOrderBlob, nullable = true, blobMetadata) + + val ex = intercept[Exception] { + HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema( + outerStruct, "WrongOrderBlobTest", "test") + } + assert(ex.getMessage.contains("Invalid blob schema structure") || + (ex.getCause != null && ex.getCause.getMessage.contains("Invalid blob schema structure")), + s"unexpected exception: ${ex.getMessage}") + } + test("test VECTOR element type mismatch throws error") { // Metadata says DOUBLE, but Spark array element type is Float val mismatchMetadata = new MetadataBuilder() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala index 36de434477d56..3ce2ce16d12d0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala @@ -346,24 +346,26 @@ class TestSchemaConverters extends SparkAdapterSupport { /** * Validates the content of the blob fields to ensure the fields match our expectations. + * + * BLOB is projected as nullable-everywhere at the Spark type layer (see the BLOB case in + * HoodieSparkSchemaConverters.toSqlType). RFC-100 non-null invariants are enforced at the + * physical-schema write boundary via HoodieSchema.Blob#createBlob, not here. + * * @param dataType the StructType containing the blob fields to validate */ private def validateBlobFields(dataType: StructType): Unit = { - // storage_type is a non-null string field val storageTypeField = dataType.fields.find(_.name == HoodieSchema.Blob.TYPE).get assertEquals(DataTypes.StringType, storageTypeField.dataType) - assertFalse(storageTypeField.nullable) - // data is a nullable binary field + assertTrue(storageTypeField.nullable) val dataField = dataType.fields.find(_.name == HoodieSchema.Blob.INLINE_DATA_FIELD).get assertEquals(DataTypes.BinaryType, dataField.dataType) assertTrue(dataField.nullable) - // reference is a nullable struct field val referenceField = dataType.fields.find(_.name == HoodieSchema.Blob.EXTERNAL_REFERENCE).get assertEquals(new StructType(Array[StructField]( - StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, DataTypes.StringType, nullable = false), + StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, DataTypes.StringType, nullable = true), StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, DataTypes.LongType, nullable = true), StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, DataTypes.LongType, nullable = true), - StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, DataTypes.BooleanType, nullable = false) + StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, DataTypes.BooleanType, nullable = true) )), referenceField.dataType) assertTrue(referenceField.nullable) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index 107b934832703..ead9943421620 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -2333,4 +2333,96 @@ class TestCreateTable extends HoodieSparkSqlTestBase { val hiveSchema = CreateHoodieTableCommand.toHiveCompatibleSchema(schema) assertEquals(ArrayType(FloatType, containsNull = false), hiveSchema("floats").dataType) } + + test("VECTOR column persists as VECTOR in Hudi tableCreateSchema and allows INSERT INTO") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id BIGINT, + | embedding VECTOR(3) + |) USING hudi + |LOCATION '${tmp.getCanonicalPath}' + |TBLPROPERTIES ( + | primaryKey = 'id' + |) + """.stripMargin) + + val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + val persistedOpt = metaClient.getTableConfig.getTableCreateSchema + assertTrue(persistedOpt.isPresent, "tableCreateSchema should be present") + val persisted = persistedOpt.get() + val embedding = persisted.getField("embedding").get().schema().getNonNullType() + assertEquals(HoodieSchemaType.VECTOR, embedding.getType) + assertEquals(3, embedding.asInstanceOf[HoodieSchema.Vector].getDimension) + + spark.sql(s""" + INSERT INTO $tableName VALUES + (1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float))), + (2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as float))) + """) + } + } + + test("BLOB column persists as BLOB in Hudi tableCreateSchema and allows INSERT INTO") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id BIGINT, + | payload BLOB + |) USING hudi + |LOCATION '${tmp.getCanonicalPath}' + |TBLPROPERTIES ( + | primaryKey = 'id' + |) + """.stripMargin) + + val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + val persistedOpt = metaClient.getTableConfig.getTableCreateSchema + assertTrue(persistedOpt.isPresent, "tableCreateSchema should be present") + val persisted = persistedOpt.get() + val payload = persisted.getField("payload").get().schema().getNonNullType() + assertEquals(HoodieSchemaType.BLOB, payload.getType) + + spark.sql( + s""" + |INSERT INTO $tableName VALUES + | (1, named_struct( + | 'type', 'INLINE', + | 'data', cast(X'010203' as binary), + | 'reference', cast(null as struct))) + """.stripMargin) + } + } + + test("BLOB nested in struct persists and allows INSERT INTO") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id BIGINT, + | media STRUCT + |) USING hudi + |LOCATION '${tmp.getCanonicalPath}' + |TBLPROPERTIES ( + | primaryKey = 'id' + |) + """.stripMargin) + + spark.sql( + s""" + |INSERT INTO $tableName VALUES + | (1, named_struct( + | 'title', 'demo', + | 'content', named_struct( + | 'type', 'INLINE', + | 'data', cast(X'0a0b0c' as binary), + | 'reference', cast(null as struct)))) + """.stripMargin) + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala index 0ededcb7ae92d..16a7da260bcf9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala @@ -30,6 +30,7 @@ import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient +import org.apache.spark.sql.Row import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} @@ -1802,4 +1803,123 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s"Expected TABLE_OR_VIEW_NOT_FOUND error but got: ${exception.getMessage}") } } + + test("Test MergeInto preserves VECTOR custom-type metadata") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id bigint, + | embedding VECTOR(3) + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id' + | ) + """.stripMargin) + + spark.sql( + s""" + |insert into $tableName values + | (1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float))) + """.stripMargin) + + // MERGE exercises both NOT MATCHED (new row with VECTOR literal) and MATCHED + // (UPDATE SET on the VECTOR column). Without the metadata re-attach in + // MergeIntoHoodieTableCommand, the Avro schema-compat check throws + // MISSING_UNION_BRANCH. + spark.sql( + s""" + |merge into $tableName t + |using ( + | select 1L as id, array(cast(0.9 as float), cast(0.8 as float), cast(0.7 as float)) as embedding + | union all + | select 2L as id, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as float)) as embedding + |) s + |on t.id = s.id + |when matched then update set t.embedding = s.embedding + |when not matched then insert (id, embedding) values (s.id, s.embedding) + """.stripMargin) + + checkAnswer(s"select id, embedding from $tableName")( + Seq(1L, Seq(0.9f, 0.8f, 0.7f)), + Seq(2L, Seq(0.4f, 0.5f, 0.6f)) + ) + } + } + + test("Test MergeInto preserves BLOB custom-type metadata") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id bigint, + | payload BLOB + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id' + | ) + """.stripMargin) + + // Use OUT_OF_LINE with a concrete reference: per RFC-100 the BLOB struct's + // inner reference fields (external_path, managed) are non-null. A + // cast(null as struct<...>) inner would type all fields nullable and + // Spark's MERGE analyzer rejects narrowing nullable -> non-null at + // assignment resolution time (INSERT tolerates it via TableOutputResolver, + // MERGE does not). + spark.sql( + s""" + |insert into $tableName values + | (1, named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', 'blobs/seed', + | 'offset', 0L, + | 'length', 3L, + | 'managed', false))) + """.stripMargin) + + // MERGE exercises both NOT MATCHED (new row with BLOB literal) and MATCHED + // (UPDATE SET on the BLOB column). Without the metadata re-attach in + // MergeIntoHoodieTableCommand, the Avro schema-compat check throws + // MISSING_UNION_BRANCH. + spark.sql( + s""" + |merge into $tableName t + |using ( + | select 1L as id, named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', 'blobs/updated', + | 'offset', 10L, + | 'length', 100L, + | 'managed', true)) as payload + | union all + | select 2L as id, named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', 'blobs/inserted', + | 'offset', 200L, + | 'length', 50L, + | 'managed', false)) as payload + |) s + |on t.id = s.id + |when matched then update set t.payload = s.payload + |when not matched then insert (id, payload) values (s.id, s.payload) + """.stripMargin) + + checkAnswer(s"select id, payload from $tableName")( + Seq(1L, Row("OUT_OF_LINE", null, Row("blobs/updated", 10L, 100L, true))), + Seq(2L, Row("OUT_OF_LINE", null, Row("blobs/inserted", 200L, 50L, false))) + ) + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala index e002abe1d22ae..9d67da0db5cee 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala @@ -27,7 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.junit.jupiter.api.Assertions.assertEquals @@ -492,4 +492,95 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { } } } + + test("Test UPDATE on VECTOR column preserves custom-type metadata") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id bigint, + | embedding VECTOR(3) + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id' + | ) + """.stripMargin) + + spark.sql( + s""" + |insert into $tableName values + | (1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float))) + """.stripMargin) + + // Assigning a VECTOR column goes through castIfNeeded; without the metadata + // re-attach it would fail schema compat with MISSING_UNION_BRANCH. + spark.sql( + s""" + |update $tableName + |set embedding = array(cast(0.9 as float), cast(0.8 as float), cast(0.7 as float)) + |where id = 1 + """.stripMargin) + + checkAnswer(s"select id, embedding from $tableName")( + Seq(1L, Seq(0.9f, 0.8f, 0.7f)) + ) + } + } + + test("Test UPDATE on BLOB column preserves custom-type metadata") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id bigint, + | payload BLOB + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id' + | ) + """.stripMargin) + + // Use OUT_OF_LINE with a concrete reference: per RFC-100 the BLOB struct's + // inner reference fields (external_path, managed) are non-null, and the + // source literal must conform to that contract. + spark.sql( + s""" + |insert into $tableName values + | (1, named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', 'blobs/seed', + | 'offset', 0L, + | 'length', 3L, + | 'managed', false))) + """.stripMargin) + + // Assigning a BLOB column goes through castIfNeeded; without the metadata + // re-attach it would fail schema compat with MISSING_UNION_BRANCH. + spark.sql( + s""" + |update $tableName + |set payload = named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', 'blobs/updated', + | 'offset', 10L, + | 'length', 100L, + | 'managed', true)) + |where id = 1 + """.stripMargin) + + checkAnswer(s"select id, payload from $tableName")( + Seq(1L, Row("OUT_OF_LINE", null, Row("blobs/updated", 10L, 100L, true))) + ) + } + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 071418a9e8e66..3fc336167e893 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -111,7 +111,12 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi } val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, staticOverwritePartitionPathOpt) - val df = sparkSession.internalCreateDataFrame(query.execute(), query.schema) + val enrichedSchema = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + query.schema, + catalogTable.tableSchemaWithoutMetaFields, + sparkSession.sessionState.conf.caseSensitiveAnalysis, + alignNullability = true) + val df = sparkSession.internalCreateDataFrame(query.execute(), enrichedSchema) val (structName, namespace) = HoodieSchemaConversionUtils.getRecordNameAndNamespace(catalogTable.tableName) val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(catalogTable.tableSchema, structName, namespace) val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df, diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index ea46207a5d4d6..3841760d10a67 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -407,6 +407,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie * expressions to the ExpressionPayload#getInsertValue. */ private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, String]): Unit = { + // Source data doesn't carry custom Hudi logical-type metadata (hudi_type, e.g. VECTOR/BLOB); + // restore it from the catalog schema so downstream Avro conversion emits the right branch. + val enrichedSourceSchema = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + sourceDF.schema, + hoodieCatalogTable.tableSchemaWithoutMetaFields, + sparkSession.sessionState.conf.caseSensitiveAnalysis, + alignNullability = false) + val enrichedSourceDF = sparkAdapter.getUnsafeUtils.createDataFrameFromRDD( + sparkSession, sourceDF.queryExecution.toRdd, enrichedSourceSchema) + val operation: String = getOperationType(parameters) // Append the table schema to the parameters. In the case of merge into, the schema of projectedJoinedDF // may be different from the target table, because the are transform logical in the update or @@ -484,7 +494,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // - Schema of the expected "joined" output of the [[sourceTable]] and [[targetTable]] writeParams ++= Seq( PAYLOAD_RECORD_AVRO_SCHEMA -> - HoodieSchemaUtils.removeMetadataFields(convertStructTypeToHoodieSchema(sourceDF.schema, "record", "")).toString, + HoodieSchemaUtils.removeMetadataFields(convertStructTypeToHoodieSchema(enrichedSourceDF.schema, "record", "")).toString, PAYLOAD_EXPECTED_COMBINED_SCHEMA -> encodeAsBase64String(toStructType(joinedExpectedOutput)) ) @@ -495,7 +505,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val (structName, namespace) = HoodieSchemaConversionUtils.getRecordNameAndNamespace(hoodieCatalogTable.tableName) val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(hoodieCatalogTable.tableSchema, structName, namespace) - val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDF, + val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, enrichedSourceDF, schemaFromCatalog = Option.apply(schema)) if (!success) { throw new HoodieException("Merge into Hoodie table command failed") diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index e5eee429cda8e..d8cd98ace5f27 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport} +import org.apache.hudi.{HoodieSchemaConversionUtils, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, SPARK_SQL_WRITES_PREPPED_KEY} import org.apache.spark.internal.Logging @@ -62,7 +62,12 @@ case class UpdateHoodieTableCommand(ut: UpdateTable, query: LogicalPlan) extends buildHoodieConfig(catalogTable) } - val df = sparkSession.internalCreateDataFrame(plan.execute(), plan.schema) + val enrichedSchema = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + plan.schema, + catalogTable.tableSchemaWithoutMetaFields, + sparkSession.sessionState.conf.caseSensitiveAnalysis, + alignNullability = true) + val df = sparkSession.internalCreateDataFrame(plan.execute(), enrichedSchema) val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, config, df) if (success && commitInstantTime.isPresent) { updateCommitMetrics(metrics, catalogTable.metaClient, commitInstantTime.get()) diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index c20c4ec24337b..b1a2a3a2c6e7d 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -111,7 +111,12 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi } val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, staticOverwritePartitionPathOpt) - val df = sparkSession.internalCreateDataFrame(query.execute(), query.schema) + val enrichedSchema = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + query.schema, + catalogTable.tableSchemaWithoutMetaFields, + sparkSession.sessionState.conf.caseSensitiveAnalysis, + alignNullability = true) + val df = sparkSession.internalCreateDataFrame(query.execute(), enrichedSchema) val (structName, namespace) = HoodieSchemaConversionUtils.getRecordNameAndNamespace(catalogTable.tableName) val schema = convertStructTypeToHoodieSchema(catalogTable.tableSchema, structName, namespace) val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df, diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index a86d4196a54f2..bc0edcfa2186c 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -407,6 +407,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie * expressions to the ExpressionPayload#getInsertValue. */ private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, String]): Unit = { + // Source data doesn't carry custom Hudi logical-type metadata (hudi_type, e.g. VECTOR/BLOB); + // restore it from the catalog schema so downstream Avro conversion emits the right branch. + val enrichedSourceSchema = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + sourceDF.schema, + hoodieCatalogTable.tableSchemaWithoutMetaFields, + sparkSession.sessionState.conf.caseSensitiveAnalysis, + alignNullability = false) + val enrichedSourceDF = sparkAdapter.getUnsafeUtils.createDataFrameFromRDD( + sparkSession, sourceDF.queryExecution.toRdd, enrichedSourceSchema) + val operation: String = getOperationType(parameters) // Append the table schema to the parameters. In the case of merge into, the schema of projectedJoinedDF // may be different from the target table, because the are transform logical in the update or @@ -484,7 +494,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // - Schema of the expected "joined" output of the [[sourceTable]] and [[targetTable]] writeParams ++= Seq( PAYLOAD_RECORD_AVRO_SCHEMA -> - HoodieSchemaUtils.removeMetadataFields(convertStructTypeToHoodieSchema(sourceDF.schema, "record", "")).toString, + HoodieSchemaUtils.removeMetadataFields(convertStructTypeToHoodieSchema(enrichedSourceDF.schema, "record", "")).toString, PAYLOAD_EXPECTED_COMBINED_SCHEMA -> encodeAsBase64String(toStructType(joinedExpectedOutput)) ) @@ -495,7 +505,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val (structName, namespace) = HoodieSchemaConversionUtils.getRecordNameAndNamespace(hoodieCatalogTable.tableName) val schema = convertStructTypeToHoodieSchema(hoodieCatalogTable.tableSchema, structName, namespace) - val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDF, + val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, enrichedSourceDF, schemaFromCatalog = Option.apply(schema)) if (!success) { throw new HoodieException("Merge into Hoodie table command failed") diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 4faa93b7efca3..6c6b020d6ba31 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport} +import org.apache.hudi.{HoodieSchemaConversionUtils, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, SPARK_SQL_WRITES_PREPPED_KEY} import org.apache.spark.internal.Logging @@ -63,7 +63,12 @@ case class UpdateHoodieTableCommand(ut: UpdateTable, query: LogicalPlan) extends buildHoodieConfig(catalogTable) } - val df = sparkSession.internalCreateDataFrame(plan.execute(), plan.schema) + val enrichedSchema = HoodieSchemaConversionUtils.alignSchemaWithCatalog( + plan.schema, + catalogTable.tableSchemaWithoutMetaFields, + sparkSession.sessionState.conf.caseSensitiveAnalysis, + alignNullability = true) + val df = sparkSession.internalCreateDataFrame(plan.execute(), enrichedSchema) val (success, commitInstantTime, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, config, df) if (success && commitInstantTime.isPresent) { updateCommitMetrics(metrics, catalogTable.metaClient, commitInstantTime.get())