-
Notifications
You must be signed in to change notification settings - Fork 2.5k
fix(vector): Preserve VECTOR/BLOB metadata on SQL INSERT path #18540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,9 +27,10 @@ import org.apache.avro.{AvroRuntimeException, Schema} | |
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.avro.HoodieSparkSchemaConverters | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} | ||
| import org.apache.spark.sql.types.{ArrayType, DataType, MapType, MetadataBuilder, StructField, StructType} | ||
| import org.apache.spark.sql.{Dataset, Row, SparkSession} | ||
|
|
||
| import java.util.Locale | ||
| import java.util.concurrent.ConcurrentHashMap | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
@@ -145,6 +146,82 @@ object HoodieSchemaConversionUtils { | |
| } | ||
| } | ||
|
|
||
| /** | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @voonhous you think we can make this java doc comment more concise if possibe?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| * Re-attach catalog metadata/nullability that Spark's write-path rewrites strip: | ||
| * | ||
| * 1. VECTOR/BLOB logical-type metadata - dropped by TableOutputResolver's Cast and | ||
| * UPDATE's castIfNeeded; without it, conversion falls back to the physical type | ||
| * (ARRAY/STRUCT) and schema compatibility fails. | ||
| * 2. Nullability (only when `alignNullability = true`) - VALUES literals and Cast | ||
| * outputs are nullable by default, which conflicts with non-null catalog columns | ||
| * (e.g. primaryKey) once this schema becomes the writer's canonical schema. | ||
| * | ||
| * `alignNullability = true` is only safe when upstream null-assertion expressions | ||
| * guarantee no nulls flow into non-null columns (INSERT/UPDATE via TableOutputResolver/ | ||
| * castIfNeeded). MERGE must pass `false`: assignments run later inside ExpressionPayload, | ||
| * so raw source rows may legitimately carry nulls. | ||
| * | ||
| * BLOB is projected nullable-everywhere by [[HoodieSparkSchemaConverters.toSqlType]], so | ||
| * the nullability branch is a no-op inside BLOB subtrees; RFC-100 non-null invariants are | ||
| * enforced at the physical-schema boundary in HoodieSchema.Blob#createBlob. | ||
| * | ||
| * Recurses into nested structs, struct-element arrays, and struct-value maps. Source-only | ||
| * fields (e.g. MERGE join keys) are returned unchanged. | ||
| */ | ||
| def alignSchemaWithCatalog(sourceSchema: StructType, | ||
|
voonhous marked this conversation as resolved.
|
||
| targetSchema: StructType, | ||
| caseSensitive: Boolean, | ||
| alignNullability: Boolean): StructType = { | ||
| val lookupKey: String => String = | ||
| if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT) | ||
| val targetByName: Map[String, StructField] = | ||
| targetSchema.fields.map(f => lookupKey(f.name) -> f).toMap | ||
|
|
||
| StructType(sourceSchema.fields.map { field => | ||
| targetByName.get(lookupKey(field.name)) match { | ||
| case Some(target) => alignField(field, target, caseSensitive, alignNullability) | ||
| case None => field | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| private def alignField(source: StructField, | ||
|
voonhous marked this conversation as resolved.
|
||
| target: StructField, | ||
| caseSensitive: Boolean, | ||
| alignNullability: Boolean): StructField = { | ||
| val alignedNullable = if (alignNullability) target.nullable else source.nullable | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 For VECTOR ( - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| val alignedField = (source.dataType, target.dataType) match { | ||
| case (s: StructType, t: StructType) => | ||
| source.copy( | ||
| dataType = alignSchemaWithCatalog(s, t, caseSensitive, alignNullability), | ||
| nullable = alignedNullable) | ||
| case (ArrayType(sElem: StructType, sContainsNull), ArrayType(tElem: StructType, tContainsNull)) => | ||
| val alignedContainsNull = if (alignNullability) tContainsNull else sContainsNull | ||
| source.copy( | ||
| dataType = ArrayType(alignSchemaWithCatalog(sElem, tElem, caseSensitive, alignNullability), alignedContainsNull), | ||
| nullable = alignedNullable) | ||
| case (MapType(sKey, sVal: StructType, sValueContainsNull), MapType(_, tVal: StructType, tValueContainsNull)) => | ||
| val alignedValueContainsNull = if (alignNullability) tValueContainsNull else sValueContainsNull | ||
| source.copy( | ||
| dataType = MapType(sKey, alignSchemaWithCatalog(sVal, tVal, caseSensitive, alignNullability), alignedValueContainsNull), | ||
| nullable = alignedNullable) | ||
| case _ => | ||
| source.copy(nullable = alignedNullable) | ||
| } | ||
|
|
||
| if (target.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) { | ||
| val enrichedMetadata = new MetadataBuilder() | ||
| .withMetadata(alignedField.metadata) | ||
| .putString( | ||
| HoodieSchema.TYPE_METADATA_FIELD, | ||
| target.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) | ||
| .build() | ||
| alignedField.copy(metadata = enrichedMetadata) | ||
| } else { | ||
| alignedField | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Recursively aligns the nullable property of Spark schema fields with HoodieSchema. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,9 +24,12 @@ import org.apache.hudi.common.schema.HoodieSchema.TimePrecision | |
| import org.apache.hudi.internal.schema.HoodieSchemaException | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.sql.types.Decimal.minBytesForPrecision | ||
|
|
||
| import java.util.Locale | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| /** | ||
|
|
@@ -257,6 +260,7 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { | |
| SchemaType(ArrayType(sparkElementType, containsNull = false), nullable = false, Some(metadata)) | ||
|
|
||
| case HoodieSchemaType.BLOB | HoodieSchemaType.RECORD => | ||
| val isBlob = hoodieSchema.getType == HoodieSchemaType.BLOB | ||
| val fullName = hoodieSchema.getFullName | ||
| if (existingRecordNames.contains(fullName)) { | ||
| throw new IncompatibleSchemaException( | ||
|
|
@@ -278,10 +282,22 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { | |
| metadataBuilder.putString(HoodieSchema.TYPE_METADATA_FIELD, HoodieSchema.Blob.TYPE_DESCRIPTOR) | ||
| } | ||
| val metadata = metadataBuilder.build() | ||
| StructField(f.name(), schemaType.dataType, schemaType.nullable, metadata) | ||
| // For BLOB: force nullable-everywhere at the Spark type layer. The RFC-100 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yihua Should we file some github issue for this to maybe relax this constraint?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uhm, KIV this please, i have encountered quite a bit of errors around this.This is not a 1-day ordeal. Let's take more time to look into this after 1.2 as i've found that Spark3.3,3.4,3.5 and 4.0 have different schema validation/check flows. And i have encountered quite abit of errors around this error which i had to fix and patch. Agree that we should file an issue here and find the best way forward. |
||
| // canonical schema declares `type`, `reference.external_path`, and | ||
| // `reference.managed` as strictly non-null, but that contract is conditional | ||
| // ("required when parent is present") and Spark's type system can't model it. | ||
| // Projecting BLOB as nullable-everywhere for Spark avoids downstream pain | ||
| // (Cast / TableOutputResolver / Cast.canCast rewrites); the on-disk physical | ||
| // schema stays RFC-100 compliant because the write path goes through | ||
| // HoodieSchema.Blob.createBlob(), which uses the canonical fields verbatim. | ||
| if (isBlob) { | ||
| StructField(f.name(), withAllFieldsNullable(schemaType.dataType), nullable = true, metadata) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @voonhous @yihua Does the change for now mean that then users can provide something like this, where user can pass null type?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your example should fail during the write stage IIUC as the write side Followup task with more details on why this is written this way: #18601 |
||
| } else { | ||
| StructField(f.name(), schemaType.dataType, schemaType.nullable, metadata) | ||
| } | ||
| } | ||
| // For BLOB types, propagate type metadata via SchemaType | ||
| val schemaTypeMetadata = if (hoodieSchema.getType == HoodieSchemaType.BLOB) { | ||
| val schemaTypeMetadata = if (isBlob) { | ||
| Some(new MetadataBuilder() | ||
| .putString(HoodieSchema.TYPE_METADATA_FIELD, hoodieSchema.asInstanceOf[HoodieSchema.Blob].toTypeDescriptor) | ||
| .build()) | ||
|
|
@@ -350,11 +366,17 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { | |
| /** | ||
| * Validates that a StructType matches the expected blob schema structure defined in {@link HoodieSchema.Blob}. | ||
| * | ||
| * Purely structural: compares field names and data types recursively, ignoring nullability. | ||
| * At the Spark type layer, BLOB is projected as nullable-everywhere by [[toSqlType]] (see the | ||
| * comment on the BLOB case there); nullability is therefore not part of the structural | ||
| * contract. The RFC-100 non-null invariants are enforced at the physical-schema write | ||
| * boundary by {@link HoodieSchema.Blob#createBlob}. | ||
| * | ||
| * @param structType the StructType to validate | ||
| * @throws IllegalArgumentException if the structure does not match the expected blob schema | ||
| */ | ||
| private def validateBlobStructure(structType: StructType): Unit = { | ||
| if (!structType.equals(expectedBlobStructType)) { | ||
| if (!matchesStructure(structType, expectedBlobStructType, SQLConf.get.caseSensitiveAnalysis)) { | ||
| throw new IllegalArgumentException( | ||
| s"""Invalid blob schema structure. Expected schema: | ||
| |${expectedBlobStructType.toDDL} | ||
|
|
@@ -363,6 +385,31 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { | |
| } | ||
| } | ||
|
|
||
| private def matchesStructure(source: DataType, expected: DataType, caseSensitive: Boolean): Boolean = | ||
| (source, expected) match { | ||
| case (s: StructType, e: StructType) => | ||
| s.length == e.length && s.fields.zip(e.fields).forall { case (sf, ef) => | ||
| nameEquals(sf.name, ef.name, caseSensitive) && | ||
| matchesStructure(sf.dataType, ef.dataType, caseSensitive) | ||
| } | ||
| case _ => source == expected | ||
| } | ||
|
|
||
| private def nameEquals(a: String, b: String, caseSensitive: Boolean): Boolean = | ||
| if (caseSensitive) a == b else a.equalsIgnoreCase(b) | ||
|
|
||
| private def withAllFieldsNullable(dataType: DataType): DataType = dataType match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: - Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying. |
||
| case s: StructType => | ||
| StructType(s.fields.map(f => f.copy( | ||
| dataType = withAllFieldsNullable(f.dataType), | ||
| nullable = true))) | ||
| case ArrayType(elementType, _) => | ||
| ArrayType(withAllFieldsNullable(elementType), containsNull = true) | ||
| case MapType(keyType, valueType, _) => | ||
| MapType(keyType, withAllFieldsNullable(valueType), valueContainsNull = true) | ||
| case other => other | ||
| } | ||
|
|
||
| private lazy val expectedVariantStructType: StructType = { | ||
| val metadataField = StructField(HoodieSchema.Variant.VARIANT_METADATA_FIELD, BinaryType, nullable = false) | ||
| val valueField = StructField(HoodieSchema.Variant.VARIANT_VALUE_FIELD, BinaryType, nullable = false) | ||
|
|
@@ -373,14 +420,25 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { | |
| * Validates that a StructType matches the expected unshredded variant schema | ||
| * (two non-null {@code BinaryType} fields: {@code metadata} and {@code value}). | ||
| * | ||
| * Note on nullability: unlike BLOB, VARIANT is not projected nullable-everywhere at the | ||
| * Spark type layer because the user-facing path is gated. Spark 3.x rejects VARIANT at | ||
| * schema resolution, and Spark 4.0+ exposes it as the native {@code VariantType} populated | ||
| * via {@code parse_json(...)}, never a user-supplied {@code named_struct}. The internal | ||
| * physical layout ({@code struct<metadata, value>} with non-null fields) only appears | ||
| * through {@link HoodieSparkSchemaConverters#toSqlType}, which produces the canonical | ||
| * non-null shape this validator expects. | ||
| * | ||
| * @param structType the StructType to validate | ||
| * @throws IllegalArgumentException if the structure does not match the expected variant schema | ||
| */ | ||
| private def validateVariantStructure(structType: StructType): Unit = { | ||
| val fieldsByName = structType.fields.map(f => f.name -> f).toMap | ||
| val caseSensitive = SQLConf.get.caseSensitiveAnalysis | ||
| val key: String => String = | ||
| if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: - Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying. |
||
| val fieldsByName = structType.fields.map(f => key(f.name) -> f).toMap | ||
| val ok = structType.length == 2 && | ||
| fieldsByName.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD).exists(f => f.dataType == BinaryType && !f.nullable) && | ||
| fieldsByName.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD).exists(f => f.dataType == BinaryType && !f.nullable) | ||
| fieldsByName.get(key(HoodieSchema.Variant.VARIANT_METADATA_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable) && | ||
| fieldsByName.get(key(HoodieSchema.Variant.VARIANT_VALUE_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reject reordered VARIANT physical structs. This validator only checks for two named binary fields via a map, so 🔧 Suggested fix private def validateVariantStructure(structType: StructType): Unit = {
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
- val key: String => String =
- if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
- val fieldsByName = structType.fields.map(f => key(f.name) -> f).toMap
- val ok = structType.length == 2 &&
- fieldsByName.get(key(HoodieSchema.Variant.VARIANT_METADATA_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable) &&
- fieldsByName.get(key(HoodieSchema.Variant.VARIANT_VALUE_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable)
+ val expectedFields = expectedVariantStructType.fields
+ val ok = structType.length == expectedFields.length &&
+ structType.fields.zip(expectedFields).forall { case (actual, expected) =>
+ nameEquals(actual.name, expected.name, caseSensitive) &&
+ actual.dataType == expected.dataType &&
+ actual.nullable == expected.nullable
+ }
if (!ok) {
throw new IllegalArgumentException(
s"""Invalid variant schema structure. Expected schema:🤖 Prompt for AI Agents— CodeRabbit (original) (source:comment#3143071733) |
||
| if (!ok) { | ||
| throw new IllegalArgumentException( | ||
| s"""Invalid variant schema structure. Expected schema: | ||
|
|
||

Uh oh!
There was an error while loading. Please reload this page.