From b7d0b5f7e88a16d0c315a35735eddbe501373089 Mon Sep 17 00:00:00 2001 From: Anton Lykov Date: Wed, 22 Apr 2026 10:00:39 +0000 Subject: [PATCH 1/2] [SPARK-56385][SQL][FOLLOWUP] Fix FIELD_NOT_FOUND when remapping pushed filters after nested schema pruning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Wrap `projectionFunc` in `scala.util.Try` when remapping `pushedFilterExpressions` against the pruned scan output in `V2ScanRelationPushDown.pruneColumns`, and drop filters whose remap fails. The accompanying `.subsetOf(AttributeSet(output))` filter is retained for the top-level-column pruning case. ### Why are the changes needed? After SPARK-56385, `pushedFilterExpressions` are remapped through `ProjectionOverSchema` to match the post-pruning scan output. When a pushed filter references a nested struct field that nested schema pruning has dropped, `ProjectionOverSchema` calls `StructType.fieldIndex` on the narrowed struct and throws `SparkIllegalArgumentException: [FIELD_NOT_FOUND]`. Repro (exercised by the new test): ``` Schema: s: struct, i: int Query: SELECT s.b FROM t WHERE s.a > 3 (s.a fully pushed) ``` Column pruning narrows `s` to `struct`. The parent `s` is still in the output, so the existing `.subsetOf` guard passes, but remapping `GetStructField(s, "a")` through `ProjectionOverSchema` throws because field `a` is gone. This does not crash for top-level pruning — when the pruned column is entirely absent from the output, `ProjectionOverSchema.getProjection` returns `None` and `transformDown` leaves the expression unchanged, which `.subsetOf` then drops cleanly. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a unit test in `DataSourceV2Suite` that reproduces the crash via a new `NestedSchemaDataSourceV2` + `SELECT s.b WHERE s.a > 3` pattern. --- .../v2/V2ScanRelationPushDown.scala | 9 ++++++--- .../sql/connector/DataSourceV2Suite.scala | 20 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index a0bc360b9aa3c..f1ed722f64a0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -808,9 +808,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } // Remap pushed filter attributes to the pruned output schema and drop filters - // whose references are no longer in the pruned output. - val remappedPushedFilters = sHolder.pushedFilterExpressions.map(projectionFunc) - .filter(_.references.subsetOf(AttributeSet(output))) + // whose references are no longer in the pruned output. Use Try because + // ProjectionOverSchema throws when a pushed filter references a nested struct + // field that was pruned from the schema. + val remappedPushedFilters = sHolder.pushedFilterExpressions.flatMap { filter => + scala.util.Try(projectionFunc(filter)).toOption + }.filter(_.references.subsetOf(AttributeSet(output))) val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output, pushedFilters = remappedPushedFilters) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 6ea1ea3faa0ec..6d3ad69994a62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -1288,6 +1288,26 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS s"struct column in pushed filter should be pruned to struct but was $prunedStructType") } + test("pushedFilters drops filters referencing pruned nested struct fields") { + // Disable constraint propagation so IsNotNull(s.a) is not added as a post-scan + // filter (it would keep field a alive in the struct). + withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") { + val df = spark.read.format(classOf[NestedSchemaDataSourceV2].getName).load() + // Filter on s.a but select only s.b. Column pruning narrows s to struct, + // so the pushed filter on s.a can't be remapped and should be dropped. + val q = df.filter($"s.a" > 3).select($"s.b") + checkAnswer(q, (4 until 10).map(i => Row(-i))) + + val scanRelation = getScanRelation(q) + val referencedStructFields = scanRelation.pushedFilters.flatMap { filter => + filter.collect { case a: AttributeReference if a.name == "s" => a } + .flatMap(_.dataType.asInstanceOf[StructType].fieldNames) + } + assert(!referencedStructFields.contains("a"), + "pushedFilters should not reference pruned nested field a") + } + } + test("scan canonicalization with pushedFilters") { // Use SimpleDataSourceV2 whose scan implements equals, so canonicalization comparison works val table = new SimpleDataSourceV2().getTable(CaseInsensitiveStringMap.empty()) From 92b01b33d7d83a358978b27ffeff30ef791db3d9 Mon Sep 17 00:00:00 2001 From: Anton Lykov Date: Thu, 23 Apr 2026 08:37:42 +0000 Subject: [PATCH 2/2] Narrow exception catch to FIELD_NOT_FOUND in pushed filter remap Address review feedback from Wenchen: catch only the specific `SparkIllegalArgumentException` with condition `FIELD_NOT_FOUND` thrown by `StructType.fieldIndex` when a pushed filter references a pruned nested field, instead of swallowing every `Throwable` via `scala.util.Try`. Other failure modes (e.g., `SparkException.internalError` from `ProjectionOverSchema`'s "unmatched child schema" branches) now surface instead of being silently dropped. --- .../datasources/v2/V2ScanRelationPushDown.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index f1ed722f64a0b..c0b72123065f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -21,7 +21,7 @@ import java.util.Locale import scala.collection.mutable -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES, GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS, RELATION_NAME, RELATION_OUTPUT} import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, ExpressionSet, ExprId, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression @@ -808,11 +808,15 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } // Remap pushed filter attributes to the pruned output schema and drop filters - // whose references are no longer in the pruned output. Use Try because - // ProjectionOverSchema throws when a pushed filter references a nested struct - // field that was pruned from the schema. + // whose references are no longer in the pruned output. Catch FIELD_NOT_FOUND + // because ProjectionOverSchema throws when a pushed filter references a nested + // struct field that was pruned from the schema. val remappedPushedFilters = sHolder.pushedFilterExpressions.flatMap { filter => - scala.util.Try(projectionFunc(filter)).toOption + try Some(projectionFunc(filter)) + catch { + case e: SparkIllegalArgumentException if e.getCondition == "FIELD_NOT_FOUND" => + None + } }.filter(_.references.subsetOf(AttributeSet(output))) val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output, pushedFilters = remappedPushedFilters)