Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Cast, Equality, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.classic.Dataset
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -153,9 +153,29 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
}
condition.toSeq.flatMap(getAmbiguousAttrs)

case _ => ambiguousColRefs.toSeq.map { ref =>
colRefAttrs.find(attr => toColumnReference(attr) == ref).get
}
case _ =>
// SPARK-52498: For a Project on top of a self-join with a foldable join condition
// (e.g., df.join(df, df("col") === 0).select(df("col"))), the column references
// in the select are not ambiguous because the foldable condition means it doesn't
// matter which side the column comes from.
val isProjectOverFoldableSelfJoin = plan match {
case Project(_, Join(
LogicalPlanWithDatasetId(_, leftId),
LogicalPlanWithDatasetId(_, rightId),
_, Some(condition), _)) if leftId == rightId =>
condition.collectFirst {
case Equality(_, b) if b.foldable => true
case Equality(a, _) if a.foldable => true
}.isDefined
case _ => false
}
if (isProjectOverFoldableSelfJoin) {
Nil
} else {
ambiguousColRefs.toSeq.map { ref =>
colRefAttrs.find(attr => toColumnReference(attr) == ref).get
}
}
Comment on lines +157 to +178
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the premise of this exemption needs reconsidering — the case it allows is genuinely ambiguous.

The justification in the comment is inaccurate. "The foldable condition means it doesn't matter which side the column comes from" is not true. The condition df("col") === 0 resolves to one side only (LEFT, since df("col") carries LEFT's exprId after ResolveDeduplicateRelations). The join is effectively:

SELECT * FROM df L JOIN df R ON L.col = 0

That filters L.col = 0 but leaves R.col unconstrained. For df = [0, 1, 2]:

L.col R.col
0 0
0 1
0 2

select df("col")) → LEFT.col → [0, 0, 0]. Selecting the right side (hypothetically) would give [0, 1, 2]. The two sides do not agree. (Note JoinWith.resolveSelfJoinCondition only rewrites trivially-true EqualTo(a, b) if a.sameRef(b)df("col") === 0 is not rewritten.)

So the column reference is ambiguous in exactly the way this rule exists to flag: the user wrote df("col") and got LEFT, but a reasonable reader of df.join(df, df("col") === 0).select(df("col")) might expect the result to be "col from the join result" — which isn't a single well-defined thing.

Internal consistency with the existing tests. SPARK-28344: fail ambiguous self join - column ref in Project in DataFrameSelfJoinSuite (line 148) already establishes that self_join.select(df("col")) is ambiguous and must throw. The PR carves out an exemption when the join condition happens to contain a foldable equality — but the foldable equality does no semantic work here (it doesn't make L.col == R.col, doesn't make df("col") resolve any differently). It's just the discriminator that separates the new test case from the four existing tests that broke with the broader fix mentioned in the PR description. Without a principled reason, this looks like reverse-engineering the rule to fit one test.

Implication for the motivation. The PR description says the single-pass resolver handles this "correctly." If we agree the case is ambiguous, then single-pass is the one with the bug — the right direction is to align single-pass with DetectAmbiguousSelfJoin (preserve the error), not to weaken DetectAmbiguousSelfJoin.

Would you mind reconsidering the direction? Happy to discuss if there's a use case I'm missing — but as-is I don't think we should land this exemption.

}

if (ambiguousAttrs.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,4 +553,18 @@ class DataFrameSelfJoinSuite extends SharedSparkSession {
.count() == 1)
}
}

test("SPARK-52498: self join with foldable condition and select should not be ambiguous") {
withSQLConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true") {
val df1 = spark.range(3).toDF("col1")

// Self-join with foldable condition + select from one side
val df2 = df1.join(df1, df1("col1") === 0).select(df1("col1"))
df2.queryExecution.analyzed

// Multi-layer self-join
val df3 = df2.join(df2, df2("col1") === 0).select(df2("col1"))
df3.queryExecution.analyzed
}
}
}