From 528e5e223f2ea8073c4ffe22cd316acf6969b0b6 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 02:49:43 +0000 Subject: [PATCH 01/12] [SPARK] Add DataFrame.zip() API for merging column-projected DataFrames Add a new DataFrame.zip(other) API that combines columns from two DataFrames that derive from the same base plan through Project chains. The optimizer rewrites the Zip node into a single Project over the shared base plan, and analysis rejects plans that cannot be merged. Co-authored-by: Isaac --- .../resources/error/error-conditions.json | 6 +++ .../sql/catalyst/analysis/CheckAnalysis.scala | 13 +++++++ .../analysis/DeduplicateRelations.scala | 5 ++- .../sql/catalyst/optimizer/Optimizer.scala | 38 +++++++++++++++++++ .../plans/logical/basicLogicalOperators.scala | 23 +++++++++++ .../sql/catalyst/trees/TreePatterns.scala | 1 + .../apache/spark/sql/classic/Dataset.scala | 13 +++++++ 7 files changed, 98 insertions(+), 1 deletion(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index ee96d6d83f90e..4e464ddadbaa1 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -8556,6 +8556,12 @@ ], "sqlState" : "42KDF" }, + "ZIP_PLANS_NOT_MERGEABLE" : { + "message" : [ + "The two DataFrames in zip() cannot be merged because they do not derive from the same base plan through Project operations." + ], + "sqlState" : "42K03" + }, "_LEGACY_ERROR_TEMP_0001" : { "message" : [ "Invalid InsertIntoContext." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 17fac640d4832..ab6fbd4f1ad14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -637,6 +637,19 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString messageParameters = Map.empty) } + case z: Zip => + def stripProjects(plan: LogicalPlan): LogicalPlan = plan match { + case Project(_, child) => stripProjects(child) + case other => other + } + val leftBase = stripProjects(z.left) + val rightBase = stripProjects(z.right) + if (!leftBase.sameResult(rightBase)) { + z.failAnalysis( + errorClass = "ZIP_PLANS_NOT_MERGEABLE", + messageParameters = Map.empty) + } + case a: Aggregate => a.groupingExpressions.foreach( expression => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 2a2440117e401..216b05e11c424 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -36,7 +36,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty) newPlan.resolveOperatorsUpWithPruning( - _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND), + _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, ZIP, COMMAND), ruleId) { case p: LogicalPlan if !p.childrenResolved => p // To resolve duplicate expression IDs for Join. @@ -56,6 +56,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] { i.copy(right = dedupRight(left, right)) case e @ Except(left, right, _) if !e.duplicateResolved && noMissingInput(right) => e.copy(right = dedupRight(left, right)) + // Resolve duplicate output for Zip. + case z @ Zip(left, right) if !z.duplicateResolved && noMissingInput(right) => + z.copy(right = dedupRight(left, right)) // Only after we finish by-name resolution for Union case u: Union if !u.byName && !u.duplicatesResolvedBetweenBranches => val unionWithChildOutputsDeduplicated = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2d6de2a11c6b8..97a920b154a2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -116,6 +116,7 @@ abstract class Optimizer(catalogManager: CatalogManager) // Operator combine CollapseRepartition, CollapseProject, + CollapseZip, OptimizeWindowFunctions, CollapseWindow, EliminateOffsets, @@ -1602,6 +1603,43 @@ object CollapseRepartition extends Rule[LogicalPlan] { } } +/** + * Collapses a [[Zip]] node into a single [[Project]] over the shared base plan. + * + * Both children of Zip must derive from the same base plan through chains of Project nodes. + * After CollapseProject has flattened each side, this rule: + * 1. Extracts the project list and base plan from each side + * 2. Remaps the right side's attribute references to the left base plan's output + * 3. Produces a single Project that combines both sides' expressions + */ +object CollapseZip extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( + _.containsPattern(ZIP), ruleId) { + case z: Zip => + val (leftExprs, leftBase) = extractProjectAndBase(z.left) + val (rightExprs, rightBase) = extractProjectAndBase(z.right) + if (leftBase.sameResult(rightBase)) { + // Build an attribute mapping from rightBase output to leftBase output (by position) + val attrMapping = AttributeMap(rightBase.output.zip(leftBase.output)) + // Remap right expressions to reference leftBase's attributes + val remappedRightExprs = rightExprs.map { expr => + expr.transform { + case a: Attribute => attrMapping.getOrElse(a, a) + }.asInstanceOf[NamedExpression] + } + Project(leftExprs ++ remappedRightExprs, leftBase) + } else { + z + } + } + + private def extractProjectAndBase( + plan: LogicalPlan): (Seq[NamedExpression], LogicalPlan) = plan match { + case Project(projectList, child) => (projectList, child) + case other => (other.output, other) + } +} + /** * Replace RepartitionByExpression numPartitions to 1 if all partition expressions are foldable * and user not specify. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 8e9f264698caf..c0b6a3b02e54d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -829,6 +829,29 @@ case class Join( newLeft: LogicalPlan, newRight: LogicalPlan): Join = copy(left = newLeft, right = newRight) } +/** + * A logical plan that combines the columns of two DataFrames that derive from the same + * base plan through chains of Project nodes. During optimization, this node is rewritten + * into a single Project over the shared base plan. If the two children do not share the + * same base plan (after stripping Project nodes), analysis will fail with an error. + */ +case class Zip(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + override def output: Seq[Attribute] = left.output ++ right.output + + override def maxRows: Option[Long] = left.maxRows + + override def maxRowsPerPartition: Option[Long] = left.maxRowsPerPartition + + final override val nodePatterns: Seq[TreePattern] = Seq(ZIP) + + def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + + override lazy val resolved: Boolean = childrenResolved && duplicateResolved + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): Zip = copy(left = newLeft, right = newRight) +} + /** * Insert query result into a directory. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 1e22c1ce86539..b3d96da1cb52a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -176,6 +176,7 @@ object TreePattern extends Enumeration { val TRANSPOSE: Value = Value val UNION: Value = Value val UNPIVOT: Value = Value + val ZIP: Value = Value val UPDATE_EVENT_TIME_WATERMARK_COLUMN: Value = Value val TYPED_FILTER: Value = Value val WINDOW: Value = Value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala index 2070873f96579..7ba9d32808aa8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala @@ -707,6 +707,19 @@ class Dataset[T] private[sql]( Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE) } + /** + * Combines the columns of this DataFrame with another DataFrame that derives from the same + * base plan through Project operations. The optimizer rewrites the resulting Zip node into a + * single Project over the shared base plan. + * + * @param other another DataFrame that shares the same base plan + * @return a new DataFrame with columns from both sides + * @throws AnalysisException if the two DataFrames do not derive from the same base plan + */ + def zip(other: sql.Dataset[_]): DataFrame = withPlan { + Zip(logicalPlan, other.logicalPlan) + } + /** @inheritdoc */ def joinWith[U](other: sql.Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { // Creates a Join node and resolve it first, to get join condition resolved, self-join resolved, From 3b0c8456b46329cba3bd5f159a6e80b3f0c4a7cd Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 03:08:13 +0000 Subject: [PATCH 02/12] Move Zip resolution from optimizer to analyzer via ResolveZip rule Zip is now always unresolved (resolved=false). A new ResolveZip analyzer rule rewrites it into a Project when both children share the same base plan. Removes the CollapseZip optimizer rule. Co-authored-by: Isaac --- .../sql/catalyst/analysis/Analyzer.scala | 1 + .../sql/catalyst/analysis/ResolveZip.scala | 65 +++++++++++++++++++ .../sql/catalyst/optimizer/Optimizer.scala | 38 ----------- .../plans/logical/basicLogicalOperators.scala | 10 +-- 4 files changed, 72 insertions(+), 42 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3b4d725840935..333e1e88c3d0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -498,6 +498,7 @@ class Analyzer( ResolveBinaryArithmetic :: new ResolveIdentifierClause(earlyBatches) :: ResolveUnion :: + ResolveZip :: FlattenSequentialStreamingUnion :: ValidateSequentialStreamingUnion :: ResolveRowLevelCommandAssignments :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala new file mode 100644 index 0000000000000..c770c7df83f12 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Zip} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.ZIP + +/** + * Resolves a [[Zip]] node by rewriting it into a single [[Project]] over the shared base plan. + * + * Both children of Zip must derive from the same base plan through chains of Project nodes. + * This rule: + * 1. Waits for both children to be resolved + * 2. Strips Project layers from each side to find the base plan + * 3. Verifies the base plans produce the same result (via `sameResult`) + * 4. Remaps the right side's attribute references to the left base plan's output + * 5. Produces a single Project that combines both sides' expressions + * + * If the base plans do not match, the Zip node remains unresolved and CheckAnalysis + * will report a [[ZIP_PLANS_NOT_MERGEABLE]] error. + */ +object ResolveZip extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(ZIP), ruleId) { + case z: Zip if z.childrenResolved => + val (leftExprs, leftBase) = extractProjectAndBase(z.left) + val (rightExprs, rightBase) = extractProjectAndBase(z.right) + if (leftBase.sameResult(rightBase)) { + // Build an attribute mapping from rightBase output to leftBase output (by position) + val attrMapping = AttributeMap(rightBase.output.zip(leftBase.output)) + // Remap right expressions to reference leftBase's attributes + val remappedRightExprs = rightExprs.map { expr => + expr.transform { + case a: Attribute => attrMapping.getOrElse(a, a) + }.asInstanceOf[NamedExpression] + } + Project(leftExprs ++ remappedRightExprs, leftBase) + } else { + z + } + } + + private def extractProjectAndBase( + plan: LogicalPlan): (Seq[NamedExpression], LogicalPlan) = plan match { + case Project(projectList, child) => (projectList, child) + case other => (other.output, other) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 97a920b154a2d..2d6de2a11c6b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -116,7 +116,6 @@ abstract class Optimizer(catalogManager: CatalogManager) // Operator combine CollapseRepartition, CollapseProject, - CollapseZip, OptimizeWindowFunctions, CollapseWindow, EliminateOffsets, @@ -1603,43 +1602,6 @@ object CollapseRepartition extends Rule[LogicalPlan] { } } -/** - * Collapses a [[Zip]] node into a single [[Project]] over the shared base plan. - * - * Both children of Zip must derive from the same base plan through chains of Project nodes. - * After CollapseProject has flattened each side, this rule: - * 1. Extracts the project list and base plan from each side - * 2. Remaps the right side's attribute references to the left base plan's output - * 3. Produces a single Project that combines both sides' expressions - */ -object CollapseZip extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( - _.containsPattern(ZIP), ruleId) { - case z: Zip => - val (leftExprs, leftBase) = extractProjectAndBase(z.left) - val (rightExprs, rightBase) = extractProjectAndBase(z.right) - if (leftBase.sameResult(rightBase)) { - // Build an attribute mapping from rightBase output to leftBase output (by position) - val attrMapping = AttributeMap(rightBase.output.zip(leftBase.output)) - // Remap right expressions to reference leftBase's attributes - val remappedRightExprs = rightExprs.map { expr => - expr.transform { - case a: Attribute => attrMapping.getOrElse(a, a) - }.asInstanceOf[NamedExpression] - } - Project(leftExprs ++ remappedRightExprs, leftBase) - } else { - z - } - } - - private def extractProjectAndBase( - plan: LogicalPlan): (Seq[NamedExpression], LogicalPlan) = plan match { - case Project(projectList, child) => (projectList, child) - case other => (other.output, other) - } -} - /** * Replace RepartitionByExpression numPartitions to 1 if all partition expressions are foldable * and user not specify. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c0b6a3b02e54d..d0fa2cb3989bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -831,9 +831,10 @@ case class Join( /** * A logical plan that combines the columns of two DataFrames that derive from the same - * base plan through chains of Project nodes. During optimization, this node is rewritten - * into a single Project over the shared base plan. If the two children do not share the - * same base plan (after stripping Project nodes), analysis will fail with an error. + * base plan through chains of Project nodes. This node is always unresolved and must be + * rewritten by [[ResolveZip]] into a single Project over the shared base plan during + * analysis. If the two children do not share the same base plan (after stripping Project + * nodes), analysis will fail with an error. */ case class Zip(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output @@ -846,7 +847,8 @@ case class Zip(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - override lazy val resolved: Boolean = childrenResolved && duplicateResolved + // Always unresolved -- must be rewritten by ResolveZip during analysis. + override lazy val resolved: Boolean = false override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): Zip = copy(left = newLeft, right = newRight) From 47395173172aad6e53623e58ffdc8e2e6aaf4d14 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 03:23:38 +0000 Subject: [PATCH 03/12] Remove unnecessary Zip handling from DeduplicateRelations Zip is always unresolved, so deduplication does not help it resolve. ResolveZip already handles attribute remapping from right base to left base via sameResult() and AttributeMap. Co-authored-by: Isaac --- .../spark/sql/catalyst/analysis/DeduplicateRelations.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 216b05e11c424..2a2440117e401 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -36,7 +36,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty) newPlan.resolveOperatorsUpWithPruning( - _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, ZIP, COMMAND), + _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND), ruleId) { case p: LogicalPlan if !p.childrenResolved => p // To resolve duplicate expression IDs for Join. @@ -56,9 +56,6 @@ object DeduplicateRelations extends Rule[LogicalPlan] { i.copy(right = dedupRight(left, right)) case e @ Except(left, right, _) if !e.duplicateResolved && noMissingInput(right) => e.copy(right = dedupRight(left, right)) - // Resolve duplicate output for Zip. - case z @ Zip(left, right) if !z.duplicateResolved && noMissingInput(right) => - z.copy(right = dedupRight(left, right)) // Only after we finish by-name resolution for Union case u: Union if !u.byName && !u.duplicatesResolvedBetweenBranches => val unionWithChildOutputsDeduplicated = From 29ca6b577e7ddfc1fad10e5fb3894115ef65a963 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 03:37:11 +0000 Subject: [PATCH 04/12] Add tests for ResolveZip and register rule in RuleIdCollection Co-authored-by: Isaac --- .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../catalyst/analysis/ResolveZipSuite.scala | 96 +++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 4ed918328a16b..b9873b390aa4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -104,6 +104,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" :: "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnion" :: + "org.apache.spark.sql.catalyst.analysis.ResolveZip" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnresolvedHaving" :: "org.apache.spark.sql.catalyst.analysis.ResolveUpdateEventTimeWatermarkColumn" :: "org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala new file mode 100644 index 0000000000000..b831352fcb082 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class ResolveZipSuite extends AnalysisTest { + + private val base = LocalRelation($"a".int, $"b".int, $"c".int) + + object Resolve extends RuleExecutor[LogicalPlan] { + override val batches: Seq[Batch] = Seq( + Batch("ResolveZip", Once, ResolveZip)) + } + + test("resolve Zip: both sides have Project over same base") { + val left = Project(Seq(base.output(0)), base) + val right = Project(Seq(base.output(1)), base) + val zip = Zip(left, right) + + val resolved = Resolve.execute(zip) + val expected = Project(Seq(base.output(0), base.output(1)), base) + comparePlans(resolved, expected) + } + + test("resolve Zip: left is bare plan, right has Project") { + val right = Project(Seq(base.output(0)), base) + val zip = Zip(base, right) + + val resolved = Resolve.execute(zip) + val expected = Project(base.output ++ Seq(base.output(0)), base) + comparePlans(resolved, expected) + } + + test("resolve Zip: both sides are bare same plan") { + val zip = Zip(base, base) + + val resolved = Resolve.execute(zip) + val expected = Project(base.output ++ base.output, base) + comparePlans(resolved, expected) + } + + test("resolve Zip: both sides have expressions over same base") { + val left = base.select(($"a" + 1).as("a_plus_1")) + val right = base.select(($"b" * 2).as("b_times_2")) + val zip = Zip(left.analyze, right.analyze) + + val resolved = Resolve.execute(zip) + assert(!resolved.isInstanceOf[Zip], "Zip should have been resolved to a Project") + assert(resolved.isInstanceOf[Project]) + assert(resolved.output.length == 2) + assert(resolved.output(0).name == "a_plus_1") + assert(resolved.output(1).name == "b_times_2") + } + + test("resolve Zip: different base plans - Zip remains unresolved") { + val base2 = LocalRelation($"x".int, $"y".int, $"z".int, $"w".int) + val left = Project(Seq(base.output(0)), base) + val right = Project(Seq(base2.output(0)), base2) + val zip = Zip(left, right) + + val resolved = Resolve.execute(zip) + // ResolveZip cannot merge, so Zip stays + assert(resolved.isInstanceOf[Zip]) + } + + test("CheckAnalysis: different base plans throws ZIP_PLANS_NOT_MERGEABLE") { + val base2 = LocalRelation($"x".int, $"y".int, $"z".int, $"w".int) + val left = Project(Seq(base.output(0)), base) + val right = Project(Seq(base2.output(0)), base2) + val zip = Zip(left, right) + + assertAnalysisErrorCondition( + zip, + expectedErrorCondition = "ZIP_PLANS_NOT_MERGEABLE", + expectedMessageParameters = Map.empty + ) + } +} From 06abe850e4b5859c7aabff18f75933139a684502 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 03:40:59 +0000 Subject: [PATCH 05/12] Add test verifying ResolveZip skips unresolved children Co-authored-by: Isaac --- .../spark/sql/catalyst/analysis/ResolveZipSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala index b831352fcb082..5e1235cfec231 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala @@ -81,6 +81,17 @@ class ResolveZipSuite extends AnalysisTest { assert(resolved.isInstanceOf[Zip]) } + test("resolve Zip: skipped when children are unresolved") { + val unresolvedChild = Project( + Seq(UnresolvedAttribute("a")), + UnresolvedRelation(Seq("t"))) + val zip = Zip(unresolvedChild, unresolvedChild) + + val result = Resolve.execute(zip) + // Zip should remain unchanged because children are not resolved + assert(result.isInstanceOf[Zip]) + } + test("CheckAnalysis: different base plans throws ZIP_PLANS_NOT_MERGEABLE") { val base2 = LocalRelation($"x".int, $"y".int, $"z".int, $"w".int) val left = Project(Seq(base.output(0)), base) From 5c2966a699c3d4bf286d595c27ed5e837c89e4d3 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 03:42:25 +0000 Subject: [PATCH 06/12] Remove unused duplicateResolved from Zip No longer referenced after removing Zip from DeduplicateRelations and changing resolved to always false. Co-authored-by: Isaac --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d0fa2cb3989bf..f27cc4a9ead51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -845,8 +845,6 @@ case class Zip(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { final override val nodePatterns: Seq[TreePattern] = Seq(ZIP) - def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - // Always unresolved -- must be rewritten by ResolveZip during analysis. override lazy val resolved: Boolean = false From b34b0a1947bbed39d9f2c3bab1b742513fd31e96 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 07:33:31 +0000 Subject: [PATCH 07/12] Add end-to-end tests for DataFrame.zip() Co-authored-by: Isaac --- .../apache/spark/sql/DataFrameZipSuite.scala | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala new file mode 100644 index 0000000000000..bf5b2fdcf1eb0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.test.SharedSparkSession + +class DataFrameZipSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + test("zip: select different columns from the same DataFrame") { + val df = Seq((1, 2, 3), (4, 5, 6), (7, 8, 9)).toDF("a", "b", "c") + val left = df.select("a") + val right = df.select("b") + + checkAnswer( + left.zip(right), + Row(1, 2) :: Row(4, 5) :: Row(7, 8) :: Nil) + } + + test("zip: select with expressions over the same DataFrame") { + val df = Seq((1, 10), (2, 20), (3, 30)).toDF("a", "b") + val left = df.select(($"a" + 1).as("a_plus_1")) + val right = df.select(($"b" * 2).as("b_times_2")) + + checkAnswer( + left.zip(right), + Row(2, 20) :: Row(3, 40) :: Row(4, 60) :: Nil) + } + + test("zip: one side selects all columns") { + val df = Seq((1, 2), (3, 4)).toDF("a", "b") + val right = df.select(($"a" + $"b").as("sum")) + + checkAnswer( + df.zip(right), + Row(1, 2, 3) :: Row(3, 4, 7) :: Nil) + } + + test("zip: resolved plan is a Project") { + val df = Seq((1, 2)).toDF("a", "b") + val left = df.select("a") + val right = df.select("b") + val zipped = left.zip(right) + + assert(zipped.queryExecution.analyzed.isInstanceOf[Project]) + } + + test("zip: different base plans throws AnalysisException") { + val df1 = Seq((1, 2)).toDF("a", "b") + val df2 = Seq((3, 4, 5)).toDF("x", "y", "z") + + checkError( + exception = intercept[AnalysisException] { + df1.select("a").zip(df2.select("x")).queryExecution.assertAnalyzed() + }, + condition = "ZIP_PLANS_NOT_MERGEABLE" + ) + } + + test("zip: different base plans from spark.range throws AnalysisException") { + val df1 = spark.range(10).toDF("id1") + val df2 = spark.range(20).toDF("id2") + + checkError( + exception = intercept[AnalysisException] { + df1.zip(df2).queryExecution.assertAnalyzed() + }, + condition = "ZIP_PLANS_NOT_MERGEABLE" + ) + } +} From 98ae4494377c3d4d6d8021e38772448a88c6c55c Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 31 Mar 2026 09:19:48 +0000 Subject: [PATCH 08/12] Add Project.isScalar and use it in ResolveZip to guard against Generators Add Project.isScalar which returns true when the project list contains only 1:1 row mapping expressions (no Generator, AggregateExpression, or WindowExpression). ResolveZip now uses this to reject non-scalar Projects rather than inline Generator checks. In practice, ExtractGenerator rewrites Projects with Generators before ResolveZip runs, so this is defense-in-depth. Co-authored-by: Isaac --- .../sql/catalyst/analysis/ResolveZip.scala | 25 +++++++++++-------- .../plans/logical/basicLogicalOperators.scala | 9 +++++++ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala index c770c7df83f12..14968bc42a1cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala @@ -25,24 +25,25 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.ZIP /** * Resolves a [[Zip]] node by rewriting it into a single [[Project]] over the shared base plan. * - * Both children of Zip must derive from the same base plan through chains of Project nodes. + * Both children of Zip must derive from the same base plan through chains of scalar Project + * nodes (1:1 row mapping -- no Generator, AggregateExpression, or WindowExpression). * This rule: * 1. Waits for both children to be resolved - * 2. Strips Project layers from each side to find the base plan + * 2. Strips scalar Project layers from each side to find the base plan * 3. Verifies the base plans produce the same result (via `sameResult`) * 4. Remaps the right side's attribute references to the left base plan's output * 5. Produces a single Project that combines both sides' expressions * - * If the base plans do not match, the Zip node remains unresolved and CheckAnalysis - * will report a [[ZIP_PLANS_NOT_MERGEABLE]] error. + * If the base plans do not match, or a Project is not scalar, the Zip node remains unresolved + * and CheckAnalysis will report a [[ZIP_PLANS_NOT_MERGEABLE]] error. */ object ResolveZip extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(ZIP), ruleId) { case z: Zip if z.childrenResolved => - val (leftExprs, leftBase) = extractProjectAndBase(z.left) - val (rightExprs, rightBase) = extractProjectAndBase(z.right) - if (leftBase.sameResult(rightBase)) { + val (leftExprs, leftBase, leftScalar) = extractProjectAndBase(z.left) + val (rightExprs, rightBase, rightScalar) = extractProjectAndBase(z.right) + if (leftScalar && rightScalar && leftBase.sameResult(rightBase)) { // Build an attribute mapping from rightBase output to leftBase output (by position) val attrMapping = AttributeMap(rightBase.output.zip(leftBase.output)) // Remap right expressions to reference leftBase's attributes @@ -57,9 +58,13 @@ object ResolveZip extends Rule[LogicalPlan] { } } + /** + * Extracts the project expression list, the base plan, and whether the projection is scalar + * (1:1 row mapping). A bare plan (no Project wrapper) is always considered scalar. + */ private def extractProjectAndBase( - plan: LogicalPlan): (Seq[NamedExpression], LogicalPlan) = plan match { - case Project(projectList, child) => (projectList, child) - case other => (other.output, other) + plan: LogicalPlan): (Seq[NamedExpression], LogicalPlan, Boolean) = plan match { + case p @ Project(projectList, child) => (projectList, child, p.isScalar) + case other => (other.output, other, true) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f27cc4a9ead51..f12686282b262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -109,6 +109,15 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) override lazy val validConstraints: ExpressionSet = getAllValidConstraints(projectList) + /** + * Returns true if the project list contains only scalar (1:1 row mapping) expressions, + * i.e. no Generator, AggregateExpression, or WindowExpression. + */ + def isScalar: Boolean = !projectList.exists(_.exists { + case _: AggregateExpression | _: Generator | _: WindowExpression => true + case _ => false + }) + override def metadataOutput: Seq[Attribute] = getTagValue(Project.hiddenOutputTag).getOrElse(child.metadataOutput) From 480c073d0e82fcdb3ad4f25436c1772f57705a1b Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 31 Mar 2026 10:01:44 +0000 Subject: [PATCH 09/12] Remove redundant Project.isScalar check from ResolveZip The childrenResolved guard already ensures children are resolved Projects. Since Project.resolved rejects Generator, AggregateExpression, and WindowExpression, the scalar (1:1 mapping) property is guaranteed by the time ResolveZip fires. Co-authored-by: Isaac --- .../sql/catalyst/analysis/ResolveZip.scala | 29 +++++++++---------- .../plans/logical/basicLogicalOperators.scala | 9 ------ 2 files changed, 14 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala index 14968bc42a1cb..40c2c357b3b62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala @@ -25,25 +25,28 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.ZIP /** * Resolves a [[Zip]] node by rewriting it into a single [[Project]] over the shared base plan. * - * Both children of Zip must derive from the same base plan through chains of scalar Project - * nodes (1:1 row mapping -- no Generator, AggregateExpression, or WindowExpression). + * Both children of Zip must derive from the same base plan through chains of Project nodes. + * Since this rule requires `childrenResolved`, and `Project.resolved` already rejects + * non-scalar expressions (Generator, AggregateExpression, WindowExpression), the children + * are guaranteed to be scalar (1:1 row mapping) by the time this rule fires. + * * This rule: * 1. Waits for both children to be resolved - * 2. Strips scalar Project layers from each side to find the base plan + * 2. Strips Project layers from each side to find the base plan * 3. Verifies the base plans produce the same result (via `sameResult`) * 4. Remaps the right side's attribute references to the left base plan's output * 5. Produces a single Project that combines both sides' expressions * - * If the base plans do not match, or a Project is not scalar, the Zip node remains unresolved - * and CheckAnalysis will report a [[ZIP_PLANS_NOT_MERGEABLE]] error. + * If the base plans do not match, the Zip node remains unresolved and CheckAnalysis + * will report a [[ZIP_PLANS_NOT_MERGEABLE]] error. */ object ResolveZip extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(ZIP), ruleId) { case z: Zip if z.childrenResolved => - val (leftExprs, leftBase, leftScalar) = extractProjectAndBase(z.left) - val (rightExprs, rightBase, rightScalar) = extractProjectAndBase(z.right) - if (leftScalar && rightScalar && leftBase.sameResult(rightBase)) { + val (leftExprs, leftBase) = extractProjectAndBase(z.left) + val (rightExprs, rightBase) = extractProjectAndBase(z.right) + if (leftBase.sameResult(rightBase)) { // Build an attribute mapping from rightBase output to leftBase output (by position) val attrMapping = AttributeMap(rightBase.output.zip(leftBase.output)) // Remap right expressions to reference leftBase's attributes @@ -58,13 +61,9 @@ object ResolveZip extends Rule[LogicalPlan] { } } - /** - * Extracts the project expression list, the base plan, and whether the projection is scalar - * (1:1 row mapping). A bare plan (no Project wrapper) is always considered scalar. - */ private def extractProjectAndBase( - plan: LogicalPlan): (Seq[NamedExpression], LogicalPlan, Boolean) = plan match { - case p @ Project(projectList, child) => (projectList, child, p.isScalar) - case other => (other.output, other, true) + plan: LogicalPlan): (Seq[NamedExpression], LogicalPlan) = plan match { + case Project(projectList, child) => (projectList, child) + case other => (other.output, other) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f12686282b262..f27cc4a9ead51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -109,15 +109,6 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) override lazy val validConstraints: ExpressionSet = getAllValidConstraints(projectList) - /** - * Returns true if the project list contains only scalar (1:1 row mapping) expressions, - * i.e. no Generator, AggregateExpression, or WindowExpression. - */ - def isScalar: Boolean = !projectList.exists(_.exists { - case _: AggregateExpression | _: Generator | _: WindowExpression => true - case _ => false - }) - override def metadataOutput: Seq[Attribute] = getTagValue(Project.hiddenOutputTag).getOrElse(child.metadataOutput) From f3456cadb4692cd366e5cd022f9c2f0d7c776563 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 31 Mar 2026 10:15:47 +0000 Subject: [PATCH 10/12] Reject non-scalar Python UDFs in ResolveZip Project.resolved catches Generator, AggregateExpression, and WindowExpression, but non-scalar Python UDFs (e.g. GROUPED_MAP) can slip through. Add an allScalar guard using PythonUDF.isScalarPythonUDF to reject them. Co-authored-by: Isaac --- .../sql/catalyst/analysis/ResolveZip.scala | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala index 40c2c357b3b62..171269f6a3c42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, NamedExpression, PythonUDF} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Zip} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.ZIP @@ -25,20 +25,21 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.ZIP /** * Resolves a [[Zip]] node by rewriting it into a single [[Project]] over the shared base plan. * - * Both children of Zip must derive from the same base plan through chains of Project nodes. - * Since this rule requires `childrenResolved`, and `Project.resolved` already rejects - * non-scalar expressions (Generator, AggregateExpression, WindowExpression), the children - * are guaranteed to be scalar (1:1 row mapping) by the time this rule fires. + * Both children of Zip must derive from the same base plan through chains of scalar Project + * nodes (1:1 row mapping). `Project.resolved` already rejects Generator, AggregateExpression, + * and WindowExpression. This rule additionally rejects non-scalar Python UDFs (e.g. + * GROUPED_MAP), which are not caught by `Project.resolved`. * * This rule: * 1. Waits for both children to be resolved * 2. Strips Project layers from each side to find the base plan * 3. Verifies the base plans produce the same result (via `sameResult`) - * 4. Remaps the right side's attribute references to the left base plan's output - * 5. Produces a single Project that combines both sides' expressions + * 4. Verifies neither side contains a non-scalar Python UDF + * 5. Remaps the right side's attribute references to the left base plan's output + * 6. Produces a single Project that combines both sides' expressions * - * If the base plans do not match, the Zip node remains unresolved and CheckAnalysis - * will report a [[ZIP_PLANS_NOT_MERGEABLE]] error. + * If the base plans do not match, or a non-scalar Python UDF is present, the Zip node remains + * unresolved and CheckAnalysis will report a [[ZIP_PLANS_NOT_MERGEABLE]] error. */ object ResolveZip extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( @@ -46,7 +47,7 @@ object ResolveZip extends Rule[LogicalPlan] { case z: Zip if z.childrenResolved => val (leftExprs, leftBase) = extractProjectAndBase(z.left) val (rightExprs, rightBase) = extractProjectAndBase(z.right) - if (leftBase.sameResult(rightBase)) { + if (leftBase.sameResult(rightBase) && allScalar(leftExprs ++ rightExprs)) { // Build an attribute mapping from rightBase output to leftBase output (by position) val attrMapping = AttributeMap(rightBase.output.zip(leftBase.output)) // Remap right expressions to reference leftBase's attributes @@ -66,4 +67,17 @@ object ResolveZip extends Rule[LogicalPlan] { case Project(projectList, child) => (projectList, child) case other => (other.output, other) } + + /** + * Returns true if all expressions are scalar (1:1 row mapping). + * `Project.resolved` already rejects Generator, AggregateExpression, and WindowExpression. + * This additionally rejects non-scalar Python UDFs (e.g. GROUPED_MAP) that can break + * the 1:1 row mapping. + */ + private def allScalar(exprs: Seq[NamedExpression]): Boolean = { + !exprs.exists(_.exists { + case udf: PythonUDF => !PythonUDF.isScalarPythonUDF(udf) + case _ => false + }) + } } From 8a2400cae0fe08cbe749e9f7416c2e0c5b130b51 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 7 Apr 2026 03:32:35 +0000 Subject: [PATCH 11/12] Add zip method declaration to sql.Dataset API Move the scaladoc to the abstract Dataset in sql/api and use @inheritdoc in the classic implementation. Co-authored-by: Isaac --- .../scala/org/apache/spark/sql/Dataset.scala | 16 ++++++++++++++++ .../org/apache/spark/sql/classic/Dataset.scala | 10 +--------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala index 0f1fe314c3500..1e2d79ea6b743 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -819,6 +819,22 @@ abstract class Dataset[T] extends Serializable { */ def crossJoin(right: Dataset[_]): DataFrame + /** + * Combines the columns of this DataFrame with another DataFrame that derives from the same + * base plan through Project operations. The analyzer rewrites the resulting Zip node into a + * single Project over the shared base plan. + * + * @param other + * Another DataFrame that shares the same base plan. + * @return + * A new DataFrame with columns from both sides. + * @throws AnalysisException + * if the two DataFrames do not derive from the same base plan. + * @group untypedrel + * @since 4.1.0 + */ + def zip(other: Dataset[_]): DataFrame + /** * Joins this Dataset returning a `Tuple2` for each pair where `condition` evaluates to true. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala index 7ba9d32808aa8..b4010f78eb527 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala @@ -707,15 +707,7 @@ class Dataset[T] private[sql]( Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE) } - /** - * Combines the columns of this DataFrame with another DataFrame that derives from the same - * base plan through Project operations. The optimizer rewrites the resulting Zip node into a - * single Project over the shared base plan. - * - * @param other another DataFrame that shares the same base plan - * @return a new DataFrame with columns from both sides - * @throws AnalysisException if the two DataFrames do not derive from the same base plan - */ + /** @inheritdoc */ def zip(other: sql.Dataset[_]): DataFrame = withPlan { Zip(logicalPlan, other.logicalPlan) } From eec3de64a960cf1c5c6aff89a7823bcaf2351474 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 7 Apr 2026 03:45:57 +0000 Subject: [PATCH 12/12] Add zip stub to Connect Dataset with UnsupportedOperationException Spark Connect does not yet support the Zip logical plan. Add a placeholder implementation that throws UnsupportedOperationException. Co-authored-by: Isaac --- .../main/scala/org/apache/spark/sql/connect/Dataset.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala index e9595dc64e9f0..841f6c7b94cac 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala @@ -345,6 +345,11 @@ class Dataset[T] private[sql] ( builder.setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS) } + /** @inheritdoc */ + def zip(other: sql.Dataset[_]): DataFrame = { + throw new UnsupportedOperationException("zip is not supported in Spark Connect") + } + /** @inheritdoc */ def joinWith[U](other: sql.Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { val joinTypeValue = toJoinType(joinType, skipSemiAnti = true)