[SPARK-57022][SQL] Support nested column pruning for transform over arrays of structs#56070
[SPARK-57022][SQL] Support nested column pruning for transform over arrays of structs#56070sunchao wants to merge 3 commits into
Conversation
(cherry picked from commit 687be8dfe78d94c201abe25e391e5015db8ae51f)
(cherry picked from commit ec4f7f99bdea93f4783f3f1e30d025772142773e)
(cherry picked from commit a25702968f6b70fd6a0bf1504e25a20b42a0902e)
peter-toth
left a comment
There was a problem hiding this comment.
Summary
Adds nested column pruning for transform(array<struct<...>>, lambda) by teaching SchemaPruning to look through the lambda variable and ProjectionOverSchema to rewrite the body once the element schema has been narrowed.
Prior state and problem. Existing nested-column pruning matches SelectedField chains that bottom out at an Attribute. A field access through transform does not look like that — the chain bottoms out at a bound NamedLambdaVariable whose dataType was set during binding from the input array's element type. Neither SelectedField nor any case in getRootFields traces past a lambda variable, so the scan is asked for the full element struct even when the lambda body only reads a couple of nested fields.
Design approach. Two coupled additions, deliberately split because pruning the schema and rewriting the expression have to happen at different stages:
- In
SchemaPruning.getRootFields, recognizeArrayTransform, walk the lambda body forGetStructFieldchains rooted at the bound element variable (LambdaVariableField+collectLambdaVariableFields), build a projected element schema from those fields, and synthesize aRootFieldfor the array argument via the newSelectedField.withDataTypehelper. Bail (returnNone) the moment the lambda escapes the GetStructField pattern (barex,xpassed as a function arg, etc.) so the full element is requested in those cases. - In
ProjectionOverSchema, add a matchingArrayTransformcase that projects the argument, copies the bound element variable with the projecteddataType, and rewrites everyGetStructFieldchain rooted at the original variable to use the new ordinals — required because pruning shifts physical positions (e.g.,struct<a, b, c>→struct<a, c>movescfrom ord 2 to ord 1).
Key design decisions.
- The
if (nestedRootFields.nonEmpty)branch returnsnestedRootFields ++ getRootFields(lambda.function)and deliberately omitsgetRootFields(argument). This is load-bearing:pruneSchemamerges all root fields by name, so reintroducing aSelectedField-derived root field for the (full-element) argument would dissolve the projection. elementVar.copy(dataType = projectedElementSchema)keeps the originalvalue: AtomicReference, so the rewritten lambda body andArrayTransform.elementVar(the lazy val) share the same mutable cell at runtime — without that,nullSafeEval's.value.set(...)would not be visible to the body.LambdaVariableFieldonly walksGetStructFieldchains; the moment it sees aGetArrayItem/ElementAt/GetMapValue(i.e., chain typesSelectedFielddoes support), it gives up andcollectLambdaVariableFieldsfalls back through_, yielding the full element.- Pruning is gated on
argument.dataTypebeingArrayType(_: StructType, _); non-struct elements are early-out.
Implementation sketch. Catalyst-only: SchemaPruning.getRootFields (analysis), ProjectionOverSchema.getProjection (rewrite), plus a private[catalyst] SelectedField.withDataType helper. The data-source pruning entry points (SchemaPruning rule under sql/core and PushDownUtils.pruneColumns for v2) need no changes — they compose through identifyRootFields / pruneSchema and the projection extractor.
Behavioral changes worth calling out. Eligible transform-over-array-of-struct queries narrow the read schema; results are unchanged. No new conf — the existing spark.sql.optimizer.nestedSchemaPruning.enabled gates this. No public API or wire change.
Suggested improvements
- Generalize beyond
ArrayTransform. The same shape — analyzeGetStructFieldchains rooted at a bound element variable, then rewrite ordinals after pruning — applies cleanly to other higher-order functions whose element is consumed rather than passed through (ArrayExists,ArrayForAll,ArrayAggregate,MapFilter). Currently each such addition would duplicate two case branches. [sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala:143] LambdaVariableFieldonly handlesGetStructFieldchains.transform(arr, x => x.subArr[0].field),x => element_at(x.subArr, 1).field, andx => x.mapField['k'].fieldwill fall back to the full element, even thoughSelectedFieldalready supports those chain types for attribute-rooted cases. [sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala:230]- Throw
SparkException.internalErrorfor the projected-schema mismatch. Matches the surrounding twothrowsites in this file. [sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala:136]
| */ | ||
| private[catalyst] def getRootFields(expr: Expression): Seq[RootField] = { | ||
| expr match { | ||
| case ArrayTransform(argument, lambda: LambdaFunction) => |
There was a problem hiding this comment.
This case, and the matching ArrayTransform branch in ProjectionOverSchema, are the only two extension points for lambda-aware nested pruning, and they're hardcoded to ArrayTransform. The same shape applies to several other higher-order functions whose element is consumed rather than passed through to the output:
ArrayExists,ArrayForAll— predicate over the element; output isBoolean.ArrayAggregate— aggregation; output is the merge type.MapFilter— predicate over(key, value); output type is the original map.
For all of those, narrowing the input element struct based on what the lambda body accesses is sound (no downstream consumer sees the original element type). Each one currently requires duplicating both the getRootFields branch and the ProjectionOverSchema branch, including the lambda-variable rewrite mechanics.
A generalized refactor (could be a follow-up rather than blocking this PR):
- Lift
collectLambdaVariableFields,LambdaVariableField, and the per-element pruning into a helper that takes any expression with aLambdaFunctionchild whose first argument binds anarray<struct<...>>(ormap<k, struct<...>>) element. - In
ProjectionOverSchema, dispatch oncase h: HigherOrderFunction if eligible(h)instead of by class, and rewrite via the sameProjectionOverLambdaVariablelogic.
The counter-arguments are real but bounded:
ArrayFilter/ArraySort/ZipWithpass the element through to the output, so they can't reuse this without also tracking output consumers — leave them out of the generalized set.- The
ArraySortcase is bigger because the lambda has two element variables; that argues for limiting v1 of the generalization to one-element HOFs.
| * [[StructField]] shape needed by the input array schema. For example, | ||
| * `x.company.address` becomes `company: struct<address: ...>`. | ||
| */ | ||
| private object LambdaVariableField { |
There was a problem hiding this comment.
The chain walker here only handles GetStructField, but SelectedField's selectField (which this is parallel to) handles GetArrayStructFields, GetArrayItem, ElementAt, GetMapValue, MapKeys, and MapValues as well. As written, queries like
SELECT transform(arr, x -> x.subArr[0].field) FROM t
SELECT transform(arr, x -> element_at(x.subArr, 1).field) FROM t
SELECT transform(arr, x -> x.mapField['k'].field) FROM twill read the full inner type for subArr / mapField because the chain is broken at the first non-GetStructField node, and collectLambdaVariableFields then falls through to _ and returns the lambda-var leaf.
As an improvement, LambdaVariableField could mirror SelectedField's case set (with selectField taking the lambda-variable leaf instead of Attribute as terminator). The two extractors would then differ only in the leaf case and could share a parameterized helper. array<struct> columns nested under another array<struct> are common, so closing this gap covers a non-trivial set of real queries.
| case projectedSchema: StructType => | ||
| GetStructField(projection, projectedSchema.fieldIndex(field.name)) | ||
| case dataType => | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
The other two "shouldn't happen" branches in this file (lines 61, 76 in the post-PR file — GetArrayStructFields and GetStructFieldObject mismatches) throw SparkException.internalError. Suggest matching:
| throw new IllegalStateException( | |
| throw SparkException.internalError( | |
| s"unmatched lambda child schema for GetStructField: ${dataType.toString}") |
Why are the changes needed?
Spark can prune nested struct fields referenced directly by a query, but it does not currently prune nested fields read through the lambda variable of
transformover anarray<struct>column.For example:
If
rule_resultscontains additional fields, Spark currently retains the full element struct in the scan schema even though only two nested fields are required. This causes unnecessary Parquet and ORC input reads for wide array element schemas.This change addresses SPARK-57022.
What changes were proposed in this pull request?
ArrayTransform.GetStructFieldordinals against the projected element schema after pruning.The implementation intentionally has two stages.
SchemaPruningdiscovers which fields the lambda needs from the array element.ProjectionOverSchemathen rewrites the lambda against the narrower element type because pruning can change field ordinals. For example, pruningstruct<a, b, c>tostruct<a, c>movescfrom ordinal2to ordinal1.Does this PR introduce any user-facing change?
Yes. Eligible queries using
transformover arrays of structs can read a narrower input schema. Query results and SQL APIs are unchanged.How was this patch tested?
build/sbt "catalyst/testOnly org.apache.spark.sql.catalyst.expressions.SchemaPruningSuite"build/sbt "sql/testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetV1SchemaPruningSuite org.apache.spark.sql.execution.datasources.parquet.ParquetV2SchemaPruningSuite org.apache.spark.sql.execution.datasources.orc.OrcV1SchemaPruningSuite org.apache.spark.sql.execution.datasources.orc.OrcV2SchemaPruningSuite -- -z ArrayTransform"git diff --check apache/master...HEADWas this patch authored or co-authored using generative AI tooling?
Generated-by: Codex (GPT-5)