From 3d84859252724263abfb42e8b9dba86fb5b2fac1 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Wed, 6 May 2026 14:46:36 +0300 Subject: [PATCH 1/2] [VL][Delta] Add UniForm Iceberg support wiring --- backends-velox/pom.xml | 6 + .../sql/delta/GlutenDeltaParquetFieldId.scala | 109 ++++++ .../delta/GlutenDeltaParquetFileFormat.scala | 3 +- .../files/GlutenDeltaFileFormatWriter.scala | 18 +- .../sql/delta/DeltaUniFormIcebergSuite.scala | 368 ++++++++++++++++++ .../velox/VeloxParquetWriterInjects.scala | 3 +- cpp/core/config/GlutenConfig.h | 2 + cpp/velox/utils/VeloxWriterUtils.cc | 110 ++++++ docs/get-started/VeloxDelta.md | 7 +- .../apache/gluten/config/GlutenConfig.scala | 1 + 10 files changed, 621 insertions(+), 6 deletions(-) create mode 100644 backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFieldId.scala create mode 100644 backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaUniFormIcebergSuite.scala diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 4432547a0fe..d79a8a899d9 100644 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -535,6 +535,12 @@ ${delta.package.name}_${scala.binary.version} provided + + io.delta + delta-iceberg_${scala.binary.version} + ${delta.version} + provided + com.amazonaws aws-java-sdk-dynamodb diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFieldId.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFieldId.scala new file mode 100644 index 00000000000..e389b9ccc94 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFieldId.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +private[delta] object GlutenDeltaParquetFieldId { + private case class ParquetFieldId(fieldId: Int, children: Seq[ParquetFieldId]) + + def withParquetFieldIds( + options: Map[String, String], + dataSchema: StructType, + deltaMetadata: Metadata): Map[String, String] = { + if (!IcebergCompatV2.isEnabled(deltaMetadata)) { + options + } else { + val fieldIds = serialize(dataSchema) + if (fieldIds.isEmpty) { + options + } else { + options + (GlutenConfig.PARQUET_FIELD_IDS -> fieldIds) + } + } + } + + private def serialize(schema: StructType): String = + schema.fields.map(field => encode(buildField(field))).mkString(",") + + private def buildField(field: StructField): ParquetFieldId = { + ParquetFieldId(columnFieldId(field), childrenFor(field.dataType, field, Seq(field.name))) + } + + private def buildSyntheticField( + ownerField: StructField, + fieldIdPath: Seq[String], + dataType: DataType): ParquetFieldId = { + ParquetFieldId( + nestedFieldId(ownerField, fieldIdPath), + childrenFor(dataType, ownerField, fieldIdPath)) + } + + private def childrenFor( + dataType: DataType, + ownerField: StructField, + fieldIdPath: Seq[String]): Seq[ParquetFieldId] = dataType match { + case StructType(fields) => + fields.map(buildField) + case ArrayType(elementType, _) => + Seq( + buildSyntheticField( + ownerField, + fieldIdPath :+ DeltaColumnMapping.PARQUET_LIST_ELEMENT_FIELD_NAME, + elementType)) + case MapType(keyType, valueType, _) => + Seq( + buildSyntheticField( + ownerField, + fieldIdPath :+ DeltaColumnMapping.PARQUET_MAP_KEY_FIELD_NAME, + keyType), + buildSyntheticField( + ownerField, + fieldIdPath :+ DeltaColumnMapping.PARQUET_MAP_VALUE_FIELD_NAME, + valueType) + ) + case _ => + Seq.empty + } + + private def columnFieldId(field: StructField): Int = { + val key = DeltaColumnMapping.PARQUET_FIELD_ID_METADATA_KEY + if (field.metadata.contains(key)) field.metadata.getLong(key).toInt else -1 + } + + private def nestedFieldId(field: StructField, fieldIdPath: Seq[String]): Int = { + val key = DeltaColumnMapping.PARQUET_FIELD_NESTED_IDS_METADATA_KEY + if (!field.metadata.contains(key)) { + -1 + } else { + val nestedIds = field.metadata.getMetadata(key) + val nestedKey = fieldIdPath.mkString(".") + if (nestedIds.contains(nestedKey)) nestedIds.getLong(nestedKey).toInt else -1 + } + } + + private def encode(fieldId: ParquetFieldId): String = { + if (fieldId.children.isEmpty) { + fieldId.fieldId.toString + } else { + fieldId.children.map(encode).mkString(s"${fieldId.fieldId}(", ",", ")") + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala index c975fbd97a8..7130fb8212d 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala @@ -278,7 +278,8 @@ case class GlutenDeltaParquetFileFormat( job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val factory = super.prepareWrite(sparkSession, job, options, dataSchema) + val writeOptions = GlutenDeltaParquetFieldId.withParquetFieldIds(options, dataSchema, metadata) + val factory = super.prepareWrite(sparkSession, job, writeOptions, dataSchema) val conf = ContextUtil.getConfiguration(job) // Always write timestamp as TIMESTAMP_MICROS for Iceberg compat based on Iceberg spec if (IcebergCompatV1.isEnabled(metadata) || IcebergCompatV2.isEnabled(metadata)) { diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala index e9ea3e47534..853d0064805 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage -import org.apache.spark.sql.delta.DeltaOptions +import org.apache.spark.sql.delta.{DeltaOptions, GlutenDeltaParquetFieldId, GlutenDeltaParquetFileFormat} import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker import org.apache.spark.sql.errors.QueryExecutionErrors @@ -116,18 +116,30 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) - val caseInsensitiveOptions = CaseInsensitiveMap(options) + val initialCaseInsensitiveOptions = CaseInsensitiveMap(options) val dataSchema = dataColumns.toStructType DataSourceUtils.verifySchema(fileFormat, dataSchema) DataSourceUtils.checkFieldNames(fileFormat, dataSchema) val isNativeWritable = true + val shouldWritePartitionColumns = + initialCaseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true") val outputDataColumns = - if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { + if (shouldWritePartitionColumns) { dataColumns ++ partitionColumns } else dataColumns + val writeOptions = fileFormat match { + case deltaFormat: GlutenDeltaParquetFileFormat => + GlutenDeltaParquetFieldId.withParquetFieldIds( + options, + outputDataColumns.toStructType, + deltaFormat.metadata) + case _ => options + } + val caseInsensitiveOptions = CaseInsensitiveMap(writeOptions) + // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = fileFormat.prepareWrite( diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaUniFormIcebergSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaUniFormIcebergSuite.scala new file mode 100644 index 00000000000..89c74c5bde5 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaUniFormIcebergSuite.scala @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.uniform.ParquetIcebergCompatV2Utils +import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.tags.ExtendedSQLTest + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS + +import java.io.File +import java.net.URI +import java.util.Locale + +import scala.io.Source +import scala.util.control.NonFatal + +@ExtendedSQLTest +class DeltaUniFormIcebergSuite + extends QueryTest + with DeltaSQLCommandTest + with DeletionVectorsTestUtils { + + private val icebergSparkExtension = + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + private val icebergRequiredClasses = Seq( + "org.apache.iceberg.spark.source.IcebergSource", + icebergSparkExtension, + "org.apache.iceberg.hive.TestHiveMetastore") + + private var metastore: AnyRef = _ + + override protected def beforeAll(): Unit = { + if (hasIcebergTestClasspath) { + super.beforeAll() + } + } + + override protected def beforeEach(): Unit = { + if (hasIcebergTestClasspath) { + super.beforeEach() + } + } + + override protected def afterEach(): Unit = { + if (hasIcebergTestClasspath) { + super.afterEach() + } + } + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + if (!hasIcebergTestClasspath) { + conf + } else { + val hiveConf = startMetastore() + val currentExtensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, "") + val extensions = + if (currentExtensions.isEmpty) icebergSparkExtension + else s"$currentExtensions,$icebergSparkExtension" + conf + .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, extensions) + .set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") + .set("spark.sql.catalog.uniform_hive", "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog.uniform_hive.type", "hive") + .set("spark.sql.catalog.uniform_hive.uri", hiveConf.get(METASTOREURIS.varname)) + .set(s"spark.hadoop.${METASTOREURIS.varname}", hiveConf.get(METASTOREURIS.varname)) + } + } + + override def afterAll(): Unit = { + try { + if (hasIcebergTestClasspath) { + super.afterAll() + } + } finally { + stopMetastore() + } + } + + private def stopMetastore(): Unit = { + if (metastore != null) { + metastore.getClass.getMethod("stop").invoke(metastore) + metastore = null + } + } + + private def startMetastore(): HiveConf = synchronized { + if (metastore == null) { + val metastoreClass = loadClass("org.apache.iceberg.hive.TestHiveMetastore") + metastore = metastoreClass.getConstructor().newInstance().asInstanceOf[AnyRef] + metastoreClass.getMethod("start").invoke(metastore) + } + metastore.getClass.getMethod("hiveConf").invoke(metastore).asInstanceOf[HiveConf] + } + + test("write UniForm Iceberg Delta table and read through Iceberg") { + requireIcebergTestClasspath() + + withTempDir { + tableDir => + val tablePath = tableDir.getCanonicalPath + val tableName = s"uniform_delta_${math.abs(System.nanoTime())}" + val icebergTableName = s"uniform_hive.default.$tableName" + withTable(tableName) { + sql(s""" + |CREATE TABLE $tableName ( + | id BIGINT, + | name STRING, + | ts TIMESTAMP, + | payload STRUCT< + | items: ARRAY>, + | attrs: MAP>, + | part INT) + |USING DELTA + |PARTITIONED BY (part) + |LOCATION '$tablePath' + |TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '7', + | 'delta.enableIcebergCompatV2' = 'true', + | 'delta.universalFormat.enabledFormats' = 'iceberg') + |""".stripMargin) + + sql(s""" + |INSERT INTO $tableName VALUES + | (1, 'alice', TIMESTAMP '2024-01-01 00:00:00', + | named_struct( + | 'items', array(named_struct('sku', 'a-1', 'qty', 2)), + | 'attrs', map('source', 'gluten')), + | 0), + | (2, 'bob', TIMESTAMP '2024-01-02 00:00:00', + | named_struct( + | 'items', array(named_struct('sku', 'b-1', 'qty', 3)), + | 'attrs', map('source', 'velox')), + | 1) + |""".stripMargin) + + val dataFile = listFiles(tableDir).find(_.getName.endsWith(".parquet")).getOrElse { + fail(s"No Parquet data file found under $tablePath") + } + val parquetFooter = + ParquetIcebergCompatV2Utils.getParquetFooter(dataFile.getCanonicalPath) + assert(ParquetIcebergCompatV2Utils.isParquetIcebergCompatV2(parquetFooter)) + + val latestDeltaVersion = sql(s"DESCRIBE HISTORY $tableName") + .select("version") + .collect() + .map(_.getLong(0)) + .max + val metadataFile = + waitForIcebergMetadata(tableDir, tableName, icebergTableName, latestDeltaVersion) + val metadata = readFile(metadataFile) + assert(metadata.contains(deltaVersionProperty(latestDeltaVersion))) + assert(metadata.contains("delta-timestamp")) + assert(!metadata.contains("\"current-snapshot-id\" : -1")) + + checkAnswer( + spark.read + .format("iceberg") + .load(icebergTableName) + .select("id", "name", "part") + .orderBy("id"), + Seq(Row(1L, "alice", 0), Row(2L, "bob", 1))) + } + } + } + + test("UniForm Iceberg rejects active deletion vectors") { + requireIcebergTestClasspath() + + withTempDir { + tableDir => + val tablePath = tableDir.getCanonicalPath + withDeletionVectorsEnabled() { + sql(s""" + |CREATE TABLE delta.`$tablePath` (id BIGINT) + |USING DELTA + |TBLPROPERTIES ('${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'true') + |""".stripMargin) + sql(s"INSERT INTO delta.`$tablePath` VALUES (1), (2)") + sql(s"DELETE FROM delta.`$tablePath` WHERE id = 1") + } + + val error = intercept[Throwable] { + sql(s""" + |ALTER TABLE delta.`$tablePath` SET TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '7', + | 'delta.enableIcebergCompatV2' = 'true', + | 'delta.universalFormat.enabledFormats' = 'iceberg') + |""".stripMargin) + } + val message = Option(error.getMessage).getOrElse("").toLowerCase(Locale.ROOT) + assert( + message.contains("deletion vector") || + message.contains("iceberg") || + message.contains("uniform")) + } + } + + private def requireIcebergTestClasspath(): Unit = { + assume( + hasIcebergTestClasspath, + s"requires ${icebergRequiredClasses.mkString(", ")} on the test classpath" + ) + } + + private def hasIcebergTestClasspath: Boolean = + icebergRequiredClasses.forall(hasClass) + + private def hasClass(className: String): Boolean = { + try { + loadClass(className) + true + } catch { + case _: ClassNotFoundException => false + } + } + + private def loadClass(className: String): Class[_] = + Class.forName(className, false, Thread.currentThread().getContextClassLoader) + + private def waitForIcebergMetadata( + tableDir: File, + deltaTableName: String, + icebergTableName: String, + expectedDeltaVersion: Long): File = { + val deadline = System.nanoTime() + 30L * 1000L * 1000L * 1000L + var files = listIcebergMetadataFiles(tableDir, deltaTableName) + var metadataLocation = getIcebergMetadataLocation(icebergTableName) + var convertedMetadata = findConvertedMetadata(files, metadataLocation, expectedDeltaVersion) + while (convertedMetadata.isEmpty && System.nanoTime() < deadline) { + spark.sparkContext.listenerBus.waitUntilEmpty(15000) + Thread.sleep(500) + files = listIcebergMetadataFiles(tableDir, deltaTableName) + metadataLocation = getIcebergMetadataLocation(icebergTableName) + convertedMetadata = findConvertedMetadata(files, metadataLocation, expectedDeltaVersion) + } + convertedMetadata.getOrElse { + fail( + s"No Iceberg metadata file for Delta version $expectedDeltaVersion generated under " + + s"${new File(tableDir, "metadata")}. " + + s"${icebergMetadataDebug(tableDir, deltaTableName, icebergTableName)}") + } + } + + private def findConvertedMetadata( + files: Seq[File], + metadataLocation: Option[String], + expectedDeltaVersion: Long): Option[File] = { + val candidates = files ++ metadataLocation.map(metadataLocationToFile) + candidates.reverse.find { + file => + file.isFile && { + val metadata = readFile(file) + metadata.contains(deltaVersionProperty(expectedDeltaVersion)) && + metadata.contains("delta-timestamp") && + !metadata.contains("\"current-snapshot-id\" : -1") + } + } + } + + private def deltaVersionProperty(version: Long): String = + "\"" + "delta-version" + "\" : \"" + version + "\"" + + private def getIcebergMetadataLocation(tableName: String): Option[String] = { + try { + spark + .sql(s"SHOW TBLPROPERTIES $tableName") + .collect() + .find(row => row.getString(0) == IcebergConstants.ICEBERG_TBLPROP_METADATA_LOCATION) + .map(_.getString(1)) + .filter(_.nonEmpty) + } catch { + case NonFatal(_) => None + } + } + + private def metadataLocationToFile(location: String): File = { + val uri = new URI(location) + if (uri.getScheme == null) new File(location) else new File(uri) + } + + private def icebergMetadataDebug( + tableDir: File, + deltaTableName: String, + icebergTableName: String): String = { + val deltaProperties = spark + .sql(s"SHOW TBLPROPERTIES $deltaTableName") + .collect() + .map(row => s"${row.getString(0)}=${row.getString(1)}") + .sorted + .mkString("[", ", ", "]") + val icebergProperties = + try { + spark + .sql(s"SHOW TBLPROPERTIES $icebergTableName") + .collect() + .map(row => s"${row.getString(0)}=${row.getString(1)}") + .sorted + .mkString("[", ", ", "]") + } catch { + case NonFatal(e) => s"unavailable: ${e.getMessage}" + } + val nearbyMetadata = listIcebergMetadataFiles(tableDir, deltaTableName) + .map(_.getCanonicalPath) + .sorted + .mkString("[", ", ", "]") + s"Delta table properties: $deltaProperties. " + + s"Iceberg table properties: $icebergProperties. Nearby metadata files: $nearbyMetadata" + } + + private def listIcebergMetadataFiles(tableDir: File, tableName: String): Seq[File] = { + val deltaMetadataDir = new File(tableDir, "metadata") + val metadataFiles = + Option(deltaMetadataDir.listFiles()) + .getOrElse(Array.empty) + .filter(file => file.isFile && file.getName.endsWith(".metadata.json")) ++ + Option(tableDir.getParentFile) + .map(listFiles) + .getOrElse(Seq.empty) + .filter( + file => + file.isFile && + file.getName.endsWith(".metadata.json") && + file.getCanonicalPath.contains(tableName)) + + metadataFiles + .groupBy(_.getCanonicalPath) + .values + .map(_.head) + .toSeq + .sortBy(_.getName) + } + + private def listFiles(root: File): Seq[File] = { + Option(root.listFiles()).getOrElse(Array.empty).toSeq.flatMap { + file => if (file.isDirectory) listFiles(file) else Seq(file) + } + } + + private def readFile(file: File): String = { + val source = Source.fromFile(file, "UTF-8") + try source.mkString + finally source.close() + } +} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala index 14f1c6d63b9..d8599c14fd3 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala @@ -52,7 +52,8 @@ class VeloxParquetWriterInjects extends VeloxFormatWriterInjects { GlutenConfig.PARQUET_ZSTD_COMPRESSION_LEVEL, GlutenConfig.PARQUET_DATAPAGE_SIZE, GlutenConfig.PARQUET_ENABLE_DICTIONARY, - GlutenConfig.PARQUET_WRITER_VERSION + GlutenConfig.PARQUET_WRITER_VERSION, + GlutenConfig.PARQUET_FIELD_IDS ).foreach(key => options.get(key).foreach(sparkOptions.put(key, _))) sparkOptions.asJava } diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 2b8ba54595b..32ceadcb141 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -69,6 +69,8 @@ const std::string kParquetEnableDictionary = "parquet.enable.dictionary"; const std::string kParquetWriterVersion = "parquet.writer.version"; +const std::string kParquetFieldIds = "parquet.field.ids"; + const std::string kParquetCompressionCodec = "spark.sql.parquet.compression.codec"; const std::string kColumnarToRowMemoryThreshold = "spark.gluten.sql.columnarToRowMemoryThreshold"; diff --git a/cpp/velox/utils/VeloxWriterUtils.cc b/cpp/velox/utils/VeloxWriterUtils.cc index bd3cca68541..56c238d1f08 100644 --- a/cpp/velox/utils/VeloxWriterUtils.cc +++ b/cpp/velox/utils/VeloxWriterUtils.cc @@ -19,6 +19,9 @@ #include +#include +#include + #include "config/GlutenConfig.h" #include "utils/ConfigExtractor.h" #include "utils/Exception.h" @@ -36,6 +39,110 @@ using namespace facebook::velox::common; namespace { const int32_t kGzipWindowBits4k = 12; const int32_t kZSTDDefaultCompressionLevel = 3; + +class ParquetFieldIdParser { + public: + explicit ParquetFieldIdParser(std::string_view value) : value_(value) {} + + std::vector parse() { + skipWhitespace(); + if (atEnd()) { + return {}; + } + auto fieldIds = parseList('\0'); + skipWhitespace(); + if (!atEnd()) { + throw GlutenException("Invalid parquet field id tree: unexpected trailing input."); + } + return fieldIds; + } + + private: + std::vector parseList(char terminator) { + std::vector fieldIds; + while (true) { + skipWhitespace(); + if (atEnd()) { + if (terminator != '\0') { + throw GlutenException("Invalid parquet field id tree: missing closing delimiter."); + } + return fieldIds; + } + if (terminator != '\0' && value_[position_] == terminator) { + ++position_; + return fieldIds; + } + + fieldIds.push_back(parseNode()); + skipWhitespace(); + if (atEnd()) { + if (terminator != '\0') { + throw GlutenException("Invalid parquet field id tree: missing closing delimiter."); + } + return fieldIds; + } + if (value_[position_] == ',') { + ++position_; + } else if (terminator != '\0' && value_[position_] == terminator) { + ++position_; + return fieldIds; + } else { + throw GlutenException("Invalid parquet field id tree: expected ',' or closing delimiter."); + } + } + } + + ParquetFieldId parseNode() { + ParquetFieldId fieldId; + fieldId.fieldId = parseInteger(); + skipWhitespace(); + if (!atEnd() && value_[position_] == '(') { + ++position_; + fieldId.children = parseList(')'); + } + return fieldId; + } + + int32_t parseInteger() { + skipWhitespace(); + if (atEnd()) { + throw GlutenException("Invalid parquet field id tree: expected integer."); + } + + int sign = 1; + if (value_[position_] == '-') { + sign = -1; + ++position_; + } + if (atEnd() || !std::isdigit(static_cast(value_[position_]))) { + throw GlutenException("Invalid parquet field id tree: expected integer."); + } + + int64_t value = 0; + while (!atEnd() && std::isdigit(static_cast(value_[position_]))) { + value = value * 10 + value_[position_] - '0'; + ++position_; + } + return static_cast(sign * value); + } + + void skipWhitespace() { + while (!atEnd() && std::isspace(static_cast(value_[position_]))) { + ++position_; + } + } + + bool atEnd() const { + return position_ >= value_.size(); + } + + std::string_view value_; + size_t position_{0}; +}; + +std::vector parseParquetFieldIds(std::string_view value) { + return ParquetFieldIdParser(value).parse(); +} } // namespace std::unique_ptr makeParquetWriteOption(const std::unordered_map& sparkConfs) { @@ -112,6 +219,9 @@ std::unique_ptr makeParquetWriteOption(const std::unordered_mapenableDictionary = false; } } + if (auto it = sparkConfs.find(kParquetFieldIds); it != sparkConfs.end()) { + writeOption->parquetFieldIds = parseParquetFieldIds(it->second); + } return writeOption; } diff --git a/docs/get-started/VeloxDelta.md b/docs/get-started/VeloxDelta.md index 69964e78908..591e300a9ff 100644 --- a/docs/get-started/VeloxDelta.md +++ b/docs/get-started/VeloxDelta.md @@ -40,9 +40,14 @@ Native Delta write is controlled by: | Deletion vectors | 7 | 3 | 3 | Reader and writer | Partial | | TimestampNTZ | 7 | 3 | 1 | Reader and writer | No | | Liquid clustering | 7 | 3 | 1 | Reader and writer | Yes | -| Iceberg readers (UniForm) | 7 | 2 | N/A | Writer | Not tested | +| Iceberg readers (UniForm) | 7 | 2 | N/A | Writer | Partial | | Type widening | 7 | 3 | N/A | Reader and writer | Partial | | Variant | 7 | 3 | 3 | Reader and writer | Not tested | | Variant shredding | 7 | 3 | 3 | Reader and writer | Not tested | | Collations | 7 | 3 | N/A | Reader and writer | Not tested | | Protected checkpoints | 7 | 1 | N/A | Writer | Not tested | + +Notes: + +- UniForm Iceberg is covered for Velox native Delta write with IcebergCompatV2, async Iceberg metadata generation, and Iceberg readback through a Hive-backed Iceberg catalog. +- Native Velox Iceberg scans still fall back for UniForm column-mapping/name-mapping reads, so this is not full native Iceberg reader support yet. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 2f8155ce70e..9da64c3fd62 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -399,6 +399,7 @@ object GlutenConfig extends ConfigRegistry { val PARQUET_DATAPAGE_SIZE: String = "parquet.page.size" val PARQUET_ENABLE_DICTIONARY: String = "parquet.enable.dictionary" val PARQUET_WRITER_VERSION: String = "parquet.writer.version" + val PARQUET_FIELD_IDS: String = "parquet.field.ids" // Hadoop config val HADOOP_PREFIX = "spark.hadoop." From 434c553f1d6a3c27e98e8785da1a1043100783eb Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Wed, 6 May 2026 15:18:48 +0300 Subject: [PATCH 2/2] [VL][Delta] Scope delta-iceberg to Spark 3.5 tests --- backends-velox/pom.xml | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index d79a8a899d9..b4c3cde76e9 100644 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -515,6 +515,17 @@ + + spark-3.5 + + + io.delta + delta-iceberg_${scala.binary.version} + ${delta.version} + test + + + delta @@ -535,12 +546,6 @@ ${delta.package.name}_${scala.binary.version} provided - - io.delta - delta-iceberg_${scala.binary.version} - ${delta.version} - provided - com.amazonaws aws-java-sdk-dynamodb