From 282d50454ac56c9c1dd3e8a9d81f4aeb02dd1584 Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Mon, 27 Apr 2026 20:28:17 +0800 Subject: [PATCH 1/2] [AURON #2217] Support Iceberg _spec_id metadata in native scan Signed-off-by: weimingdiit --- .../sql/auron/iceberg/IcebergScanSupport.scala | 9 ++++++--- .../auron/plan/NativeIcebergTableScanExec.scala | 16 +++++++++++++++- .../iceberg/AuronIcebergIntegrationSuite.scala | 14 ++++++++++++++ 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 1d9efbc3b..e11c2655b 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.types.{BinaryType, DataType, DecimalType, StringType import org.apache.auron.{protobuf => pb} // fileSchema is read from the data files. partitionSchema carries supported metadata columns -// (for example _file) that are materialized as per-file constant values in the native scan. +// (for example _file and _spec_id) that are materialized as per-file constant values in +// the native scan. final case class IcebergScanPlan( fileTasks: Seq[FileScanTask], fileFormat: FileFormat, @@ -58,7 +59,8 @@ object IcebergScanSupport extends Logging { val readSchema = scan.readSchema val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) - // Native scan can project file-level metadata columns such as _file via partition values. + // Native scan can project file-level metadata columns such as _file and _spec_id + // via partition values. // Metadata columns that require per-row materialization (for example _pos) still fallback. if (unsupportedMetadataColumns.nonEmpty) { return None @@ -136,7 +138,8 @@ object IcebergScanSupport extends Logging { } private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean = - field.name == MetadataColumns.FILE_PATH.name() + field.name == MetadataColumns.FILE_PATH.name() || + field.name == MetadataColumns.SPEC_ID.name() private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 386977925..2e79c0f31 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFil import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.util.SerializableConfiguration import org.apache.auron.{protobuf => pb} @@ -67,6 +67,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca private lazy val partitions: Array[FilePartition] = buildFilePartitions() private lazy val fileSizes: Map[String, Long] = buildFileSizes() + private lazy val fileSpecIds: Map[String, Int] = buildFileSpecIds() private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema) private lazy val nativePartitionSchema: pb.Schema = @@ -125,6 +126,10 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca field.name match { case name if name == MetadataColumns.FILE_PATH.name() => NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral + case name if name == MetadataColumns.SPEC_ID.name() => + NativeConverters + .convertExpr(Literal.create(fileSpecIds(filePath), IntegerType)) + .getLiteral case name => throw new IllegalStateException( s"unsupported Iceberg metadata column in native scan: $name") @@ -221,6 +226,15 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca .toMap } + private def buildFileSpecIds(): Map[String, Int] = { + // Map file path to Iceberg partition spec id; tasks may split a file into multiple ranges. + fileTasks + .map(task => task.file().location() -> task.file().specId()) + .groupBy(_._1) + .mapValues(_.head._2) + .toMap + } + private def buildFilePartitions(): Array[FilePartition] = { // Convert Iceberg file tasks into Spark FilePartition groups for execution. if (fileTasks.isEmpty) { diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 0c5b752a8..fbef60f4b 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -214,6 +214,20 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan supports _spec_id metadata column") { + withTable("local.db.t4_spec_id") { + sql("create table local.db.t4_spec_id using iceberg as select 1 as id, 'a' as v") + checkSparkAnswerAndOperator("select _spec_id from local.db.t4_spec_id") + } + } + + test("iceberg native scan supports data columns with _file and _spec_id metadata columns") { + withTable("local.db.t4_metadata_mixed") { + sql("create table local.db.t4_metadata_mixed using iceberg as select 1 as id, 'a' as v") + checkSparkAnswerAndOperator("select id, _file, _spec_id from local.db.t4_metadata_mixed") + } + } + test("iceberg native scan supports data columns with _file metadata column") { withTable("local.db.t4_mixed") { sql("create table local.db.t4_mixed using iceberg as select 1 as id, 'a' as v") From 8d5c070c7fd26bceb33a21eb5f902af616a0ab76 Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Tue, 5 May 2026 10:21:32 +0800 Subject: [PATCH 2/2] Support Iceberg spec id metadata in native scan Signed-off-by: weimingdiit --- .../plan/NativeIcebergTableScanExec.scala | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 2e79c0f31..312f3c570 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFil import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration import org.apache.auron.{protobuf => pb} @@ -128,7 +128,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral case name if name == MetadataColumns.SPEC_ID.name() => NativeConverters - .convertExpr(Literal.create(fileSpecIds(filePath), IntegerType)) + .convertExpr(Literal.create(fileSpecIds(filePath), field.dataType)) .getLiteral case name => throw new IllegalStateException( @@ -228,11 +228,21 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca private def buildFileSpecIds(): Map[String, Int] = { // Map file path to Iceberg partition spec id; tasks may split a file into multiple ranges. - fileTasks - .map(task => task.file().location() -> task.file().specId()) - .groupBy(_._1) - .mapValues(_.head._2) - .toMap + val specIds = scala.collection.mutable.HashMap.empty[String, Int] + fileTasks.foreach { task => + val filePath = task.file().location() + val specId = task.file().specId() + specIds.get(filePath) match { + case Some(existingSpecId) if existingSpecId != specId => + throw new IllegalStateException( + s"Inconsistent Iceberg partition spec id for file $filePath: " + + s"$existingSpecId != $specId") + case Some(_) => + case None => + specIds.put(filePath, specId) + } + } + specIds.toMap } private def buildFilePartitions(): Array[FilePartition] = {