From 96c01ae4c52e0b51c5c90e3f89d833f43f107503 Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Fri, 17 Apr 2026 19:33:29 +0800 Subject: [PATCH] [spark] Fix ArrayIndexOutOfBoundsException reading empty partitioned format table --- .../sql/execution/SparkFormatTable.scala | 24 +++++++++++++++++-- .../spark/table/PaimonFormatTableTest.scala | 21 ++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala index 9d0983ed0baa..2da5ea72c59f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala @@ -92,6 +92,16 @@ object SparkFormatTable { } } + private def alignPartitionSpec( + inferred: PartitionSpec, + partitionSchema: StructType): PartitionSpec = { + if (inferred.partitionColumns.isEmpty && partitionSchema.nonEmpty) { + PartitionSpec(partitionSchema, inferred.partitions) + } else { + inferred + } + } + // Extend from MetadataLogFileIndex to override partitionSchema private class PartitionedMetadataLogFileIndex( sparkSession: SparkSession, @@ -99,7 +109,12 @@ object SparkFormatTable { parameters: Map[String, String], userSpecifiedSchema: Option[StructType], override val partitionSchema: StructType) - extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema) + extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema) { + + override def partitionSpec(): PartitionSpec = { + alignPartitionSpec(super.partitionSpec(), partitionSchema) + } + } // Extend from InMemoryFileIndex to override partitionSchema private class PartitionedInMemoryFileIndex( @@ -118,7 +133,12 @@ object SparkFormatTable { userSpecifiedSchema, fileStatusCache, userSpecifiedPartitionSpec, - metadataOpsTimeNs) + metadataOpsTimeNs) { + + override def partitionSpec(): PartitionSpec = { + alignPartitionSpec(super.partitionSpec(), partitionSchema) + } + } } trait PartitionedFormatTable extends SupportsPartitionManagement { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala index 6fa31f5e2cc1..453d7bf78199 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala @@ -405,6 +405,27 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { } } + test( + "PartitionedFormatTable: engine impl should return empty result when selecting a partition column on an empty table") { + val tableName = "engine_empty_partition_select" + withTable(tableName) { + val location = s"${tempDBDir.getCanonicalPath}/engine_empty_partition_select" + sql(s"""CREATE TABLE $tableName (clickid STRING, geo STRING, dt STRING, hour STRING) + |USING parquet PARTITIONED BY (dt, hour) LOCATION '$location' + |TBLPROPERTIES ('format-table.implementation'='engine') + |""".stripMargin) + fileIO.mkdirs(new Path(location)) + checkAnswer(sql(s"SELECT clickid, hour FROM $tableName WHERE dt='20260418'"), Nil) + checkAnswer(sql(s"SELECT clickid, dt FROM $tableName WHERE hour='00'"), Nil) + checkAnswer( + sql(s"""SELECT clickid, max(hour) AS max_hour FROM $tableName + |WHERE dt='20260418' AND hour <= '11' AND geo IS NOT NULL + |GROUP BY clickid""".stripMargin), + Nil + ) + } + } + test("Paimon format table: show partitions") { withTable("t") { sql("""