diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java index a8cc02f58dad3..3803c76cd7777 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java @@ -38,7 +38,9 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -265,6 +267,104 @@ public static void convertRowVectorColumns(InternalRow row, GenericInternalRow r } } + // --------------------------------------------------------------------------- + // Blob descriptor-mode helpers (Parquet DESCRIPTOR read path) + // --------------------------------------------------------------------------- + + /** + * Detects BLOB columns from Spark StructType metadata annotations. + * + * @param schema Spark StructType (may be null) + * @return set of field ordinals that are BLOB columns; empty set if none found + */ + public static Set detectBlobColumnsFromMetadata(StructType schema) { + Set blobColumnIndices = new LinkedHashSet<>(); + if (schema == null) { + return blobColumnIndices; + } + StructField[] fields = schema.fields(); + for (int i = 0; i < fields.length; i++) { + StructField field = fields[i]; + if (field.metadata().contains(HoodieSchema.TYPE_METADATA_FIELD)) { + String typeStr = field.metadata().getString(HoodieSchema.TYPE_METADATA_FIELD); + HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeStr); + if (parsed != null && parsed.getType() == HoodieSchemaType.BLOB) { + blobColumnIndices.add(i); + } + } + } + return blobColumnIndices; + } + + /** + * Strips the {@code data} sub-field from BLOB struct columns so the Parquet reader + * skips the binary column chunk entirely (genuine I/O savings). + * + *

The returned schema has 2-field blob structs: {@code {type, reference}} instead of + * the full {@code {type, data, reference}}. Use {@link #buildBlobNullPadRowMapper} to + * re-insert null at the {@code data} position after reading. + * + * @param schema the original Spark schema + * @param blobColumns ordinals of blob columns (from {@link #detectBlobColumnsFromMetadata}) + * @return a new StructType with the {@code data} sub-field removed from blob structs + */ + public static StructType stripBlobDataField(StructType schema, Set blobColumns) { + StructField[] fields = schema.fields(); + StructField[] newFields = new StructField[fields.length]; + for (int i = 0; i < fields.length; i++) { + if (blobColumns.contains(i) && fields[i].dataType() instanceof StructType) { + StructType blobStruct = (StructType) fields[i].dataType(); + List kept = new ArrayList<>(); + for (StructField sub : blobStruct.fields()) { + if (!sub.name().equals(HoodieSchema.Blob.INLINE_DATA_FIELD)) { + kept.add(sub); + } + } + StructType strippedStruct = new StructType(kept.toArray(new StructField[0])); + newFields[i] = new StructField(fields[i].name(), strippedStruct, fields[i].nullable(), fields[i].metadata()); + } else { + newFields[i] = fields[i]; + } + } + return new StructType(newFields); + } + + /** + * Returns a {@link Function} that expands 2-field blob structs {@code {type, reference}} + * back to 3-field structs {@code {type, null, reference}} by inserting null at the + * {@code data} position, then applies the projection callback. + * + * @param readSchema the Spark schema of incoming rows (blob structs have 2 fields) + * @param blobColumns ordinals of blob columns in {@code readSchema} + * @param projectionCallback called with the expanded row; must copy any data it needs to retain + * @return a function that converts one row and returns the projected result + */ + public static Function buildBlobNullPadRowMapper( + StructType readSchema, + Set blobColumns, + Function projectionCallback) { + int numFields = readSchema.fields().length; + GenericInternalRow buffer = new GenericInternalRow(numFields); + return row -> { + for (int i = 0; i < numFields; i++) { + if (row.isNullAt(i)) { + buffer.setNullAt(i); + } else if (blobColumns.contains(i)) { + InternalRow blobStruct = row.getStruct(i, 2); + // Expand {type, reference} → {type, null, reference} + GenericInternalRow expanded = new GenericInternalRow(3); + expanded.update(0, blobStruct.isNullAt(0) ? null : blobStruct.getUTF8String(0)); + expanded.setNullAt(1); + expanded.update(2, blobStruct.isNullAt(1) ? null : blobStruct.getStruct(1, HoodieSchema.Blob.getReferenceFieldCount())); + buffer.update(i, expanded); + } else { + buffer.update(i, row.get(i, readSchema.apply(i).dataType())); + } + } + return projectionCallback.apply(buffer); + }; + } + /** * Re-attaches {@link HoodieSchema#TYPE_METADATA_FIELD} to Spark fields that are * Arrow {@code FixedSizeList} in the Lance file. diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 92b963f683906..8c50f7886589b 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -97,7 +97,29 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR structType } - val (readSchema, readFilters) = getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField) + // Blob DESCRIPTOR mode: strip `data` sub-field from blob structs for Parquet base files. + // Applied after vector rewrite; not applied to Lance base files or log files. + val isParquetBaseFile = FSUtils.isBaseFile(filePath) && !isLanceBaseFile + val isBlobDescriptorMode = isParquetBaseFile && { + val hadoopConf = storageConfiguration.unwrapAs(classOf[Configuration]) + import org.apache.hudi.common.config.HoodieReaderConfig + val modeValue = hadoopConf.get(HoodieReaderConfig.BLOB_INLINE_READ_MODE.key(), + HoodieReaderConfig.BLOB_INLINE_READ_MODE.defaultValue()) + modeValue.equalsIgnoreCase(HoodieReaderConfig.BLOB_INLINE_READ_MODE_DESCRIPTOR) + } + val blobColumnIndices: Set[Int] = if (isBlobDescriptorMode) { + VectorConversionUtils.detectBlobColumnsFromMetadata(parquetReadStructType).asScala.map(_.intValue()).toSet + } else { + Set.empty + } + val blobReadStructType = if (blobColumnIndices.nonEmpty) { + val javaBlobCols: java.util.Set[Integer] = blobColumnIndices.map(Integer.valueOf).asJava + VectorConversionUtils.stripBlobDataField(parquetReadStructType, javaBlobCols) + } else { + parquetReadStructType + } + + val (readSchema, readFilters) = getSchemaAndFiltersForRead(blobReadStructType, hasRowIndexField) if (FSUtils.isLogFile(filePath)) { // NOTE: now only primary key based filtering is supported for log files new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath) @@ -120,12 +142,18 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR readSchema, StructType(Seq.empty), getSchemaHandler.getInternalSchemaOpt, readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], tableSchemaOpt)) - // Post-process: convert binary VECTOR columns back to typed arrays - if (vectorColumnInfo.nonEmpty) { - SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(rawIterator, vectorColumnInfo, readSchema) + // Post-process: re-insert null `data` field into blob structs, then convert vectors + val blobPaddedIterator = if (blobColumnIndices.nonEmpty) { + SparkFileFormatInternalRowReaderContext.wrapWithBlobNullPadding(rawIterator, blobColumnIndices, readSchema, parquetReadStructType) } else { rawIterator } + + if (vectorColumnInfo.nonEmpty) { + SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(blobPaddedIterator, vectorColumnInfo, if (blobColumnIndices.nonEmpty) parquetReadStructType else readSchema) + } else { + blobPaddedIterator + } } } @@ -375,4 +403,23 @@ object SparkFileFormatInternalRowReaderContext { } } + /** + * Wraps a closable iterator to re-insert null {@code data} fields into blob structs + * after Parquet DESCRIPTOR mode read (expanding 2-field → 3-field structs). + */ + private[hudi] def wrapWithBlobNullPadding( + iterator: ClosableIterator[InternalRow], + blobColumnIndices: Set[Int], + readSchema: StructType, + targetSchema: StructType): ClosableIterator[InternalRow] = { + val javaBlobCols: java.util.Set[Integer] = blobColumnIndices.map(Integer.valueOf).asJava + val projection = UnsafeProjection.create(targetSchema) + val mapper = VectorConversionUtils.buildBlobNullPadRowMapper(readSchema, javaBlobCols, projection.apply(_)) + new ClosableIterator[InternalRow] { + override def hasNext: Boolean = iterator.hasNext + override def next(): InternalRow = mapper.apply(iterator.next()) + override def close(): Unit = iterator.close() + } + } + } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestVectorConversionUtilsBlob.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestVectorConversionUtilsBlob.java new file mode 100644 index 0000000000000..953e52ebdf05d --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestVectorConversionUtilsBlob.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaType; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.jupiter.api.Test; + +import java.util.Set; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for the blob descriptor-mode helpers in {@link VectorConversionUtils}. + */ +public class TestVectorConversionUtilsBlob { + + private static Metadata blobMeta() { + return new MetadataBuilder() + .putString(HoodieSchema.TYPE_METADATA_FIELD, HoodieSchemaType.BLOB.name()) + .build(); + } + + private static StructType blobStruct3Field() { + return new StructType(new StructField[] { + new StructField(HoodieSchema.Blob.TYPE, DataTypes.StringType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.INLINE_DATA_FIELD, DataTypes.BinaryType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE, + new StructType(new StructField[] { + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, DataTypes.StringType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, DataTypes.LongType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, DataTypes.LongType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, DataTypes.BooleanType, true, Metadata.empty()) + }), true, Metadata.empty()) + }); + } + + @Test + public void detectBlobColumnsFromMetadataFindsMarkedFields() { + StructType schema = new StructType(new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("payload", blobStruct3Field(), true, blobMeta()), + new StructField("name", DataTypes.StringType, true, Metadata.empty()) + }); + Set blobs = VectorConversionUtils.detectBlobColumnsFromMetadata(schema); + assertEquals(1, blobs.size()); + assertTrue(blobs.contains(1)); + } + + @Test + public void detectBlobColumnsFromMetadataReturnsEmptyForNonBlob() { + StructType schema = new StructType(new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("name", DataTypes.StringType, true, Metadata.empty()) + }); + assertTrue(VectorConversionUtils.detectBlobColumnsFromMetadata(schema).isEmpty()); + } + + @Test + public void detectBlobColumnsFromMetadataNullSchema() { + assertTrue(VectorConversionUtils.detectBlobColumnsFromMetadata(null).isEmpty()); + } + + @Test + public void stripBlobDataFieldRemovesDataAndPreservesOthers() { + StructType schema = new StructType(new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("payload", blobStruct3Field(), true, blobMeta()) + }); + Set blobs = VectorConversionUtils.detectBlobColumnsFromMetadata(schema); + + StructType stripped = VectorConversionUtils.stripBlobDataField(schema, blobs); + + // Top-level fields preserved. + assertEquals(2, stripped.fields().length); + assertEquals("id", stripped.fields()[0].name()); + assertEquals("payload", stripped.fields()[1].name()); + // Top-level metadata preserved. + assertTrue(stripped.fields()[1].metadata().contains(HoodieSchema.TYPE_METADATA_FIELD)); + + // Blob struct now has 2 fields: type, reference (no data). + StructType blob = (StructType) stripped.fields()[1].dataType(); + assertEquals(2, blob.fields().length); + assertEquals(HoodieSchema.Blob.TYPE, blob.fields()[0].name()); + assertEquals(HoodieSchema.Blob.EXTERNAL_REFERENCE, blob.fields()[1].name()); + assertFalse(blob.getFieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD).isDefined()); + } + + @Test + public void buildBlobNullPadRowMapperReinsertsNullData() { + StructType readSchema = new StructType(new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("payload", + new StructType(new StructField[] { + new StructField(HoodieSchema.Blob.TYPE, DataTypes.StringType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE, + new StructType(new StructField[] { + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, DataTypes.StringType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, DataTypes.LongType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, DataTypes.LongType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, DataTypes.BooleanType, true, Metadata.empty()) + }), true, Metadata.empty()) + }), true, blobMeta()) + }); + + GenericInternalRow blob2Field = new GenericInternalRow(2); + blob2Field.update(0, UTF8String.fromString(HoodieSchema.Blob.INLINE)); + blob2Field.setNullAt(1); // reference null + GenericInternalRow input = new GenericInternalRow(2); + input.update(0, 42); + input.update(1, blob2Field); + + Function mapper = + VectorConversionUtils.buildBlobNullPadRowMapper(readSchema, java.util.Collections.singleton(1), row -> row); + InternalRow out = mapper.apply(input); + + assertEquals(42, out.getInt(0)); + InternalRow expanded = out.getStruct(1, 3); + assertEquals(HoodieSchema.Blob.INLINE, + expanded.getUTF8String(0).toString()); + assertTrue(expanded.isNullAt(1), "data should be null after pad"); + assertTrue(expanded.isNullAt(2), "reference was null in input"); + } + + @Test + public void buildBlobNullPadRowMapperHandlesNullBlobRow() { + StructType readSchema = new StructType(new StructField[] { + new StructField("payload", + new StructType(new StructField[] { + new StructField(HoodieSchema.Blob.TYPE, DataTypes.StringType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE, + new StructType(new StructField[] { + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, DataTypes.StringType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, DataTypes.LongType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, DataTypes.LongType, true, Metadata.empty()), + new StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, DataTypes.BooleanType, true, Metadata.empty()) + }), true, Metadata.empty()) + }), true, blobMeta()) + }); + + GenericInternalRow input = new GenericInternalRow(1); + input.setNullAt(0); + + Function mapper = + VectorConversionUtils.buildBlobNullPadRowMapper(readSchema, java.util.Collections.singleton(0), row -> row); + InternalRow out = mapper.apply(input); + assertTrue(out.isNullAt(0), "null blob row stays null"); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java index 942d1aeabb503..78dee224e6f47 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java @@ -113,7 +113,10 @@ public class HoodieReaderConfig extends HoodieConfig { .withValidValues(BLOB_INLINE_READ_MODE_CONTENT, BLOB_INLINE_READ_MODE_DESCRIPTOR) .withDocumentation("How Hudi interprets INLINE BLOB values on read. " + "CONTENT (default) returns the raw inline bytes. " - + "DESCRIPTOR returns an OUT_OF_LINE-shaped reference pointing at the backing " - + "Lance file with the INLINE payload's position and size, so callers can defer " - + "the byte read via read_blob()."); + + "DESCRIPTOR suppresses the inline bytes (data field is null) and returns metadata only, " + + "avoiding the I/O cost of reading large binary payloads. " + + "For Lance files, the reference struct is populated with blob stream coordinates " + + "so read_blob() can materialize bytes on demand. " + + "For Parquet files, the data column is skipped via nested column projection; " + + "read_blob() automatically downgrades that scan to CONTENT so bytes are materialized."); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala index a11e3075a6fb7..58928c5ad612b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala @@ -237,11 +237,16 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext, override def buildFileFormat(): FileFormat = { val tableConfig = metaClient.getTableConfig + val blobDescriptorMode = optParams.getOrElse( + HoodieReaderConfig.BLOB_INLINE_READ_MODE.key(), + HoodieReaderConfig.BLOB_INLINE_READ_MODE.defaultValue() + ).equalsIgnoreCase(HoodieReaderConfig.BLOB_INLINE_READ_MODE_DESCRIPTOR) new HoodieFileGroupReaderBasedFileFormat(basePath.toString, HoodieTableSchema(tableStructSchema, tableSchema, internalSchemaOpt), tableConfig.getTableName, queryTimestamp.get, getMandatoryFields, isMOR, isBootstrap, isIncremental, validCommits, shouldUseRecordPosition, getRequiredFilters, - tableConfig.isMultipleBaseFileFormatsEnabled, tableConfig.getBaseFileFormat) + tableConfig.isMultipleBaseFileFormatsEnabled, tableConfig.getBaseFileFormat, + initialBlobDescriptorMode = blobDescriptorMode) } override def buildBucketSpec(): Option[BucketSpec] = None diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 3da22ff8ebe7e..bb6cc0d0aadda 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -21,7 +21,7 @@ import org.apache.hudi.{HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, Hoo import org.apache.hudi.cdc.{CDCFileGroupIterator, HoodieCDCFileGroupSplit, HoodieCDCFileIndex} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.utils.SparkInternalSchemaConverter -import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties} +import org.apache.hudi.common.config.{HoodieMemoryConfig, HoodieReaderConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.schema.HoodieSchema @@ -61,7 +61,7 @@ import org.apache.spark.util.SerializableConfiguration import java.io.Closeable -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters.{mapAsJavaMapConverter, setAsJavaSetConverter} trait HoodieFormatTrait { @@ -86,9 +86,25 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, shouldUseRecordPosition: Boolean, requiredFilters: Seq[Filter], isMultipleBaseFileFormatsEnabled: Boolean, - hoodieFileFormat: HoodieFileFormat) + val hoodieFileFormat: HoodieFileFormat, + initialBlobDescriptorMode: Boolean = false) extends ParquetFileFormat with SparkAdapterSupport with HoodieFormatTrait with Logging with Serializable { + // Mutable so ReadBlobRule can flip DESCRIPTOR→CONTENT when read_blob() appears in a query. + // We mutate in place because Spark's planner/AQE retains a reference to this FileFormat + // instance even after the optimizer rewrites the LogicalRelation, so swapping wouldn't stick. + @volatile private var _isBlobDescriptorMode: Boolean = initialBlobDescriptorMode + + def isBlobDescriptorMode: Boolean = _isBlobDescriptorMode + + def setBlobDescriptorMode(enabled: Boolean): Unit = { + _isBlobDescriptorMode = enabled + } + + def restoreBlobDescriptorMode(): Unit = { + _isBlobDescriptorMode = initialBlobDescriptorMode + } + private lazy val schema = tableSchema.schema private lazy val tableSchemaAsMessageType: HOption[MessageType] = { @@ -133,6 +149,19 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } } + @transient private var cachedBlobDetection: (StructType, Set[Int]) = _ + + private def detectBlobColumnsCached(schema: StructType): Set[Int] = { + if (cachedBlobDetection != null && (cachedBlobDetection._1 eq schema)) { + cachedBlobDetection._2 + } else { + import scala.collection.JavaConverters._ + val result = VectorConversionUtils.detectBlobColumnsFromMetadata(schema).asScala.map(_.intValue()).toSet + cachedBlobDetection = (schema, result) + result + } + } + /** * Checks if the file format supports vectorized reading, please refer to SPARK-40918. * @@ -151,6 +180,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, supportVectorizedRead = false supportReturningBatch = false false + } else if (isBlobDescriptorMode && detectBlobColumnsCached(schema).nonEmpty) { + // Blob DESCRIPTOR mode strips the data sub-field from blob structs and null-pads + // post-read, which requires row-level access. + supportVectorizedRead = false + supportReturningBatch = false + false } else { val conf = sparkSession.sessionState.conf val parquetBatchSupported = ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && supportBatchWithTableSchema @@ -238,6 +273,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline setSchemaEvolutionConfigs(augmentedStorageConf) augmentedStorageConf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, hasTimestampMillisFieldInTableSchema.toString) + // Sync the Hadoop conf from the mutable flag so the MOR path (which reads the conf in + // SparkFileFormatInternalRowReaderContext) agrees with the COW path after a ReadBlobRule flip. + augmentedStorageConf.set( + HoodieReaderConfig.BLOB_INLINE_READ_MODE.key(), + if (isBlobDescriptorMode) HoodieReaderConfig.BLOB_INLINE_READ_MODE_DESCRIPTOR + else HoodieReaderConfig.BLOB_INLINE_READ_MODE_CONTENT) val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) = partitionSchema.fields.toSeq.zipWithIndex.filter(p => !mandatoryFields.contains(p._1.name)).unzip // The schema of the partition cols we want to append the value instead of reading from the file @@ -457,6 +498,39 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } } + /** + * Detects BLOB columns and strips the {@code data} sub-field when DESCRIPTOR mode is active. + * Only applies to Parquet format; other formats handle DESCRIPTOR mode natively. + */ + private def withBlobDescriptorRewrite(schema: StructType): (StructType, Set[Int]) = { + if (hoodieFileFormat != HoodieFileFormat.PARQUET) { + (schema, Set.empty[Int]) + } else { + import scala.collection.JavaConverters._ + val blobCols = VectorConversionUtils.detectBlobColumnsFromMetadata(schema).asScala.map(_.intValue()).toSet + if (blobCols.isEmpty) { + (schema, blobCols) + } else { + val javaBlobCols: java.util.Set[Integer] = blobCols.map(Integer.valueOf).asJava + (VectorConversionUtils.stripBlobDataField(schema, javaBlobCols), blobCols) + } + } + } + + /** + * Wraps an iterator to re-insert null {@code data} fields into blob structs + * after Parquet DESCRIPTOR mode read (expanding 2-field → 3-field structs). + */ + private def wrapWithBlobNullPadding(iter: Iterator[InternalRow], + readSchema: StructType, + targetSchema: StructType, + blobCols: Set[Int]): Iterator[InternalRow] = { + val blobProjection = UnsafeProjection.create(targetSchema) + val javaBlobCols: java.util.Set[Integer] = blobCols.map(Integer.valueOf).asJava + val mapper = VectorConversionUtils.buildBlobNullPadRowMapper(readSchema, javaBlobCols, blobProjection.apply(_)) + iter.map(mapper.apply(_)) + } + /** * Wraps an iterator to convert binary VECTOR columns back to typed arrays. * The read schema has BinaryType for vector columns; the target schema has ArrayType. @@ -486,16 +560,23 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val (modifiedOutputSchema, outputVectorCols) = if (hasVectors) withVectorRewrite(outputSchema) else (outputSchema, Map.empty[Int, HoodieSchema.Vector]) val (modifiedRequestedSchema, _) = if (hasVectors) withVectorRewrite(requestedSchema) else (requestedSchema, Map.empty[Int, HoodieSchema.Vector]) + // Blob DESCRIPTOR mode: strip `data` sub-field from blob structs so Parquet skips + // those column chunks entirely (real I/O savings). Applied after vector rewrite. + val (blobRequiredSchema, blobCols) = if (isBlobDescriptorMode) withBlobDescriptorRewrite(modifiedRequiredSchema) else (modifiedRequiredSchema, Set.empty[Int]) + val hasBlobs = blobCols.nonEmpty + val (blobOutputSchema, outputBlobCols) = if (hasBlobs) withBlobDescriptorRewrite(modifiedOutputSchema) else (modifiedOutputSchema, Set.empty[Int]) + val (blobRequestedSchema, _) = if (hasBlobs) withBlobDescriptorRewrite(modifiedRequestedSchema) else (modifiedRequestedSchema, Set.empty[Int]) + val rawIter = if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) { //none of partition fields are read from the file, so the reader will do the appending for us - parquetFileReader.read(file, modifiedRequiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + parquetFileReader.read(file, blobRequiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) } else if (remainingPartitionSchema.fields.length == 0) { //we read all of the partition fields from the file val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils //we need to modify the partitioned file so that the partition values are empty val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty, pfileUtils.getPathFromPartitionedFile(file), file.start, file.length) //and we pass an empty schema for the partition schema - parquetFileReader.read(modifiedFile, modifiedOutputSchema, new StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + parquetFileReader.read(modifiedFile, blobOutputSchema, new StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) } else { //need to do an additional projection here. The case in mind is that partition schema is "a,b,c" mandatoryFields is "a,c", //then we will read (dataSchema + a + c) and append b. So the final schema will be (data schema + a + c +b) @@ -503,8 +584,25 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils val partitionValues = getFixedPartitionValues(file.partitionValues, partitionSchema, fixedPartitionIndexes) val modifiedFile = pfileUtils.createPartitionedFile(partitionValues, pfileUtils.getPathFromPartitionedFile(file), file.start, file.length) - val iter = parquetFileReader.read(modifiedFile, modifiedRequestedSchema, remainingPartitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) - projectIter(iter, StructType(modifiedRequestedSchema.fields ++ remainingPartitionSchema.fields), modifiedOutputSchema) + val iter = parquetFileReader.read(modifiedFile, blobRequestedSchema, remainingPartitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + projectIter(iter, StructType(blobRequestedSchema.fields ++ remainingPartitionSchema.fields), blobOutputSchema) + } + + // Post-read: re-insert null `data` field into blob structs (expanding 2-field → 3-field) + val blobPaddedIter = if (hasBlobs) { + val readSchema = if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) { + StructType(blobRequiredSchema.fields ++ partitionSchema.fields) + } else { + blobOutputSchema + } + val targetSchema = if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) { + StructType(modifiedRequiredSchema.fields ++ partitionSchema.fields) + } else { + modifiedOutputSchema + } + wrapWithBlobNullPadding(rawIter, readSchema, targetSchema, outputBlobCols) + } else { + rawIter } if (hasVectors) { @@ -514,9 +612,9 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } else { modifiedOutputSchema } - wrapWithVectorConversion(rawIter, readSchema, outputSchema, outputVectorCols) + wrapWithVectorConversion(blobPaddedIter, readSchema, outputSchema, outputVectorCols) } else { - rawIter + blobPaddedIter } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala index 0328cdd0c5c22..3c6524369a12f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala @@ -208,8 +208,8 @@ class BatchedBlobReader( // Dispatch based on storage_type (field 0) val storageType = accessor.getString(blobStruct, 0) if (storageType == HoodieSchema.Blob.INLINE) { - // Case 1: Inline — bytes are in field 1 - val bytes = accessor.getBytes(blobStruct, 1) + // Case 1: Inline — bytes are in field 1 (may be null in DESCRIPTOR mode) + val bytes = if (accessor.isNullAt(blobStruct, 1)) null else accessor.getBytes(blobStruct, 1) batch += RowInfo[R]( originalRow = row, filePath = "", diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala index b91d08674cb95..3de02893e1d4e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql.hudi.blob -import org.apache.spark.sql.AnalysisException +import org.apache.hudi.common.model.HoodieFileFormat + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, ExprId, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedFileFormat import org.apache.spark.sql.types.{DataType, StructType} import scala.collection.mutable @@ -41,7 +44,14 @@ import scala.collection.mutable.ArrayBuffer */ case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + override def apply(plan: LogicalPlan): LogicalPlan = { + // Parquet DESCRIPTOR strips the data column for I/O savings, but Parquet has no byte-range + // channel like Lance, so read_blob() would return null. Force CONTENT for scans whose query + // uses read_blob(), and restore DESCRIPTOR for queries that don't (the FileFormat instance + // is shared across queries against the same temp view). + val needsContentMode = containsReadBlobAnywhere(plan) || plan.exists(_.isInstanceOf[BatchedBlobRead]) + syncDescriptorMode(plan, forceContentMode = needsContentMode) + plan resolveOperatorsUp { case Project(projectList, Filter(condition, child)) if containsReadBlobExpression(projectList) && containsReadBlobInExpression(condition) @@ -80,12 +90,39 @@ case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] { throw new IllegalArgumentException( s"read_blob() may only appear in SELECT or WHERE clauses. Found in unsupported logical plan node: ${node.nodeName}. " + s"Move read_blob() to a SELECT or WHERE clause. Full plan: ${node.simpleStringWithNodeId()}") + } } private def containsReadBlobInAnyExpression(plan: LogicalPlan): Boolean = { plan.expressions.exists(containsReadBlobInExpression) } + private def containsReadBlobAnywhere(plan: LogicalPlan): Boolean = { + plan.exists(node => node.expressions.exists(containsReadBlobInExpression)) + } + + // Lance scans are skipped because their hoodieFileFormat != PARQUET and Lance already + // supports read_blob() under DESCRIPTOR via byte-range references. + private def syncDescriptorMode(plan: LogicalPlan, forceContentMode: Boolean): Unit = { + plan.foreach { + case lr: LogicalRelation if lr.relation.isInstanceOf[HadoopFsRelation] => + lr.relation.asInstanceOf[HadoopFsRelation].fileFormat match { + case ff: HoodieFileGroupReaderBasedFileFormat + if ff.hoodieFileFormat == HoodieFileFormat.PARQUET => + if (forceContentMode) { + if (ff.isBlobDescriptorMode) { + ff.setBlobDescriptorMode(false) + logInfo("read_blob() detected; downgrading DESCRIPTOR to CONTENT for Parquet scan.") + } + } else { + ff.restoreBlobDescriptorMode() + } + case _ => + } + case _ => + } + } + private def wrapWithBlobReads( blobColumns: Seq[AttributeReference], child: LogicalPlan): (LogicalPlan, Map[ExprId, Attribute]) = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala index 533c9589e338c..97e936e82d4e6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala @@ -19,7 +19,9 @@ package org.apache.hudi.blob +import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.exception.HoodieIOException import org.apache.hudi.testutils.HoodieClientTestBase @@ -29,6 +31,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource import java.util.Collections @@ -198,6 +202,222 @@ class TestReadBlobSQL extends HoodieClientTestBase { assertBytesContent(data1) } + @Test + def testReadBlobSupersedesDescriptorOnParquet(): Unit = { + // Parquet DESCRIPTOR strips the data column at read-time. With no byte-range channel + // like Lance, read_blob() would return null — so it must force CONTENT for that scan. + val tablePath = s"$tempDir/hudi_inline_blob_descriptor_table" + val expectedPayloads = Seq( + Array.fill[Byte](100)(0xA.toByte), + Array.fill[Byte](100)(0xB.toByte), + Array.fill[Byte](100)(0xC.toByte)) + val rawDf = sparkSession.createDataFrame( + expectedPayloads.zipWithIndex.map { case (bytes, i) => (i + 1, s"rec${i + 1}", bytes) }) + .toDF("id", "name", "bytes") + .withColumn("payload", inlineBlobStructCol("payload", col("bytes"))) + .select("id", "name", "payload") + + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("name", StringType, nullable = true), + StructField("payload", BlobType().asInstanceOf[StructType], nullable = true, blobMetadata) + )) + val df = sparkSession.createDataFrame(rawDf.rdd, canonicalSchema) + + df.write.format("hudi") + .option("hoodie.table.name", "blob_descriptor_test") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + sparkSession.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "DESCRIPTOR") + .load(tablePath) + .createOrReplaceTempView("hudi_blob_descriptor_view") + + // Assertion A: read_blob() materializes bytes despite DESCRIPTOR mode. + val readBlobRows = sparkSession.sql( + "SELECT id, read_blob(payload) AS bytes FROM hudi_blob_descriptor_view ORDER BY id" + ).collect() + assertEquals(3, readBlobRows.length) + readBlobRows.zip(expectedPayloads).foreach { case (row, expected) => + val bytes = row.getAs[Array[Byte]]("bytes") + assertNotNull(bytes, + "read_blob() must materialize bytes even when DESCRIPTOR mode is set") + assertArrayEquals(expected, bytes, + s"read_blob() bytes mismatch for id=${row.getInt(0)}") + } + + // Assertion B: a follow-up query on the same view without read_blob() must restore + // DESCRIPTOR (assertion A flipped it to CONTENT on the shared FileFormat instance). + // Select the full payload struct so the blob marker survives nested column pruning. + val descriptorRows = sparkSession.sql( + "SELECT id, payload FROM hudi_blob_descriptor_view ORDER BY id" + ).collect() + assertEquals(3, descriptorRows.length) + descriptorRows.foreach { row => + val payload = row.getStruct(row.fieldIndex("payload")) + assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)), + s"DESCRIPTOR mode should null-pad data when read_blob() is not used (id=${row.getInt(0)})") + } + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testParquetDescriptorSkipsDataColumn(tableType: HoodieTableType): Unit = { + // Core PR feature: with DESCRIPTOR mode active on a Parquet table, the data + // sub-column is null-padded (column projection skipped it) while type and the + // rest of the blob struct survive. Parquet has no byte-range channel like Lance + // so reference is also null. Run for both COW and MOR. + val tablePath = s"$tempDir/hudi_descriptor_${tableType.name().toLowerCase}" + val expectedPayloads = Seq( + Array.fill[Byte](128)(0x1.toByte), + Array.fill[Byte](128)(0x2.toByte)) + val rawDf = sparkSession.createDataFrame( + expectedPayloads.zipWithIndex.map { case (bytes, i) => (i + 1, bytes) }) + .toDF("id", "bytes") + .withColumn("payload", inlineBlobStructCol("payload", col("bytes"))) + .select("id", "payload") + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload", BlobType().asInstanceOf[StructType], nullable = true, blobMetadata) + )) + val df = sparkSession.createDataFrame(rawDf.rdd, canonicalSchema) + + df.write.format("hudi") + .option("hoodie.table.name", s"descriptor_${tableType.name().toLowerCase}") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "id") + .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType.name()) + .option(DataSourceWriteOptions.OPERATION.key(), "bulk_insert") + .mode("overwrite") + .save(tablePath) + + val rows = sparkSession.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "DESCRIPTOR") + .load(tablePath) + .select(col("id"), col("payload")) + .orderBy(col("id")) + .collect() + assertEquals(2, rows.length) + rows.foreach { row => + val payload = row.getStruct(row.fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.INLINE, + payload.getString(payload.fieldIndex(HoodieSchema.Blob.TYPE)), + s"DESCRIPTOR mode should preserve INLINE type (id=${row.getInt(0)})") + assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)), + s"DESCRIPTOR mode should null-pad data (id=${row.getInt(0)})") + assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)), + s"Parquet has no byte-range channel; reference should be null (id=${row.getInt(0)})") + } + } + + @Test + def testReadBlobInWhereClauseUnderDescriptor(): Unit = { + // ReadBlobRule.containsReadBlobAnywhere walks node.expressions, so a read_blob() + // in WHERE must also trigger the DESCRIPTOR→CONTENT downgrade. + val tablePath = s"$tempDir/hudi_blob_descriptor_where" + val payloads = Seq( + Array.fill[Byte](100)(0xA.toByte), + Array.fill[Byte](200)(0xB.toByte), + Array.fill[Byte](100)(0xC.toByte)) + val rawDf = sparkSession.createDataFrame( + payloads.zipWithIndex.map { case (bytes, i) => (i + 1, bytes) }) + .toDF("id", "bytes") + .withColumn("payload", inlineBlobStructCol("payload", col("bytes"))) + .select("id", "payload") + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload", BlobType().asInstanceOf[StructType], nullable = true, blobMetadata) + )) + sparkSession.createDataFrame(rawDf.rdd, canonicalSchema).write.format("hudi") + .option("hoodie.table.name", "blob_descriptor_where") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + sparkSession.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "DESCRIPTOR") + .load(tablePath) + .createOrReplaceTempView("hudi_blob_where_view") + + val rows = sparkSession.sql( + "SELECT id, read_blob(payload) AS bytes FROM hudi_blob_where_view " + + "WHERE length(read_blob(payload)) = 200" + ).collect() + assertEquals(1, rows.length) + assertEquals(2, rows(0).getInt(0)) + assertArrayEquals(payloads(1), rows(0).getAs[Array[Byte]]("bytes")) + } + + @Test + def testMultiBlobColumnsDescriptorWholeScanDowngrade(): Unit = { + // Whole-scan downgrade: read_blob() on one blob column forces CONTENT for the + // whole scan, so other blob columns also come through with bytes materialized. + val tablePath = s"$tempDir/hudi_blob_descriptor_multi" + val a = Array.fill[Byte](80)(0xA.toByte) + val b = Array.fill[Byte](80)(0xB.toByte) + val rawDf = sparkSession.createDataFrame(Seq((1, a, b))) + .toDF("id", "bytes_a", "bytes_b") + .withColumn("payload_a", inlineBlobStructCol("payload_a", col("bytes_a"))) + .withColumn("payload_b", inlineBlobStructCol("payload_b", col("bytes_b"))) + .select("id", "payload_a", "payload_b") + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload_a", BlobType().asInstanceOf[StructType], nullable = true, blobMetadata), + StructField("payload_b", BlobType().asInstanceOf[StructType], nullable = true, blobMetadata) + )) + sparkSession.createDataFrame(rawDf.rdd, canonicalSchema).write.format("hudi") + .option("hoodie.table.name", "blob_descriptor_multi") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + sparkSession.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "DESCRIPTOR") + .load(tablePath) + .createOrReplaceTempView("hudi_blob_multi_view") + + val rows = sparkSession.sql( + "SELECT id, read_blob(payload_a) AS bytes_a, payload_b FROM hudi_blob_multi_view" + ).collect() + assertEquals(1, rows.length) + assertArrayEquals(a, rows(0).getAs[Array[Byte]]("bytes_a")) + val payloadB = rows(0).getStruct(rows(0).fieldIndex("payload_b")) + assertArrayEquals(b, payloadB.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD), + "whole-scan downgrade: payload_b.data must also be materialized") + } + + @Test + def testDescriptorOnTableWithoutBlobColumns(): Unit = { + // No-op safety: setting DESCRIPTOR on a table that has no blob columns must not + // error or alter results. + val tablePath = s"$tempDir/hudi_no_blob_table" + sparkSession.createDataFrame(Seq((1, "a"), (2, "b"))) + .toDF("id", "name") + .write.format("hudi") + .option("hoodie.table.name", "no_blob_table") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + val rows = sparkSession.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "DESCRIPTOR") + .load(tablePath) + .select(col("id"), col("name")) + .orderBy(col("id")) + .collect() + assertEquals(2, rows.length) + assertEquals(1, rows(0).getInt(0)) + assertEquals("a", rows(0).getString(1)) + assertEquals(2, rows(1).getInt(0)) + assertEquals("b", rows(1).getString(1)) + } + @Test def testReadBlobInSubquery(): Unit = { val filePath = createTestFile(tempDir, "subquery.bin", 10000)