From 8d8408c5398d14b17f8469b39cdd202b19b4afd8 Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Mon, 27 Apr 2026 20:55:04 +0800 Subject: [PATCH] [AURON #2219] Expose numFiles and numPartitions metrics for native Iceberg scan Signed-off-by: weimingdiit --- .../execution/BuildInfoInSparkUISuite.scala | 1 + .../plan/NativeIcebergTableScanExec.scala | 27 +++++++++++++++---- .../AuronIcebergIntegrationSuite.scala | 26 ++++++++++++++++++ 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala index 864879f12..08a4474bf 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala @@ -46,6 +46,7 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with BaseAuronSQLSuite { val listeners = spark.sparkContext.listenerBus.findListenersByClass[AuronSQLAppStatusListener] assert(listeners.size === 1) val listener = listeners(0) + spark.sparkContext.listenerBus.waitUntilEmpty() assert(listener.getAuronBuildInfo() == 1) } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 386977925..6fa5f1a99 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -34,11 +34,10 @@ import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelpe import org.apache.spark.sql.auron.iceberg.IcebergScanPlan import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -53,7 +52,9 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca with Logging { override lazy val metrics: Map[String, SQLMetric] = - NativeHelper.getNativeFileScanMetrics(sparkContext) + NativeHelper.getNativeFileScanMetrics(sparkContext) ++ Seq( + "numPartitions" -> SQLMetrics.createMetric(sparkContext, "Native.partitions_read"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "Native.files_read")) override val output = basedScan.output override val outputPartitioning = basedScan.outputPartitioning @@ -65,7 +66,11 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks private lazy val pruningPredicates: Seq[pb.PhysicalExprNode] = plan.pruningPredicates - private lazy val partitions: Array[FilePartition] = buildFilePartitions() + private lazy val partitions: Array[FilePartition] = { + val filePartitions = buildFilePartitions() + postDriverMetrics(filePartitions) + filePartitions + } private lazy val fileSizes: Map[String, Long] = buildFileSizes() private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema) @@ -221,6 +226,18 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca .toMap } + private def postDriverMetrics(filePartitions: Array[FilePartition]): Unit = { + val numPartitions = filePartitions.length + metrics("numPartitions").add(numPartitions) + val numFiles = filePartitions.foldLeft(0L)(_ + _.files.length) + metrics("numFiles").add(numFiles) + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates( + sparkContext, + executionId, + Seq(metrics("numPartitions"), metrics("numFiles"))) + } + private def buildFilePartitions(): Array[FilePartition] = { // Convert Iceberg file tasks into Spark FilePartition groups for execution. if (fileTasks.isEmpty) { diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 0c5b752a8..8a45cfb19 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -26,6 +26,7 @@ import org.apache.iceberg.deletes.PositionDelete import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.auron.iceberg.IcebergScanSupport +import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec class AuronIcebergIntegrationSuite @@ -51,6 +52,21 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan exposes file scan driver metrics") { + withTable("local.db.t_metrics") { + sql("create table local.db.t_metrics using iceberg as select 1 as id, 'a' as v") + val df = sql("select * from local.db.t_metrics") + checkAnswer(df, Seq(Row(1, "a"))) + val nativeScan = nativeIcebergTableScanExec(df) + nativeScan.doExecuteNative() + val metrics = nativeScan.metrics.map { case (name, metric) => name -> metric.value } + assert(metrics.contains("numPartitions")) + assert(metrics.contains("numFiles")) + assert(metrics("numPartitions") > 0) + assert(metrics("numFiles") > 0) + } + } + test("iceberg native scan is applied for empty COW table") { withTable("local.db.t_empty") { sql(""" @@ -345,4 +361,14 @@ class AuronIcebergIntegrationSuite df.queryExecution.sparkPlan.collectFirst { case scan: BatchScanExec => IcebergScanSupport.plan(scan) }.flatten + + private def nativeIcebergTableScanExec(df: DataFrame): NativeIcebergTableScanExec = { + val batchScan = df.queryExecution.sparkPlan.collectFirst { case scan: BatchScanExec => + scan + } + assert(batchScan.nonEmpty) + val scanPlan = IcebergScanSupport.plan(batchScan.get) + assert(scanPlan.nonEmpty) + NativeIcebergTableScanExec(batchScan.get, scanPlan.get) + } }