Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python/pyspark/sql/tests/test_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,26 @@ def test_select_join_keys(self):
self.assertTrue(df1.join(df2, "id", how).select(df1["id"]).count() >= 0, how)
self.assertTrue(df1.join(df2, "id", how).select(df2["id"]).count() >= 0, how)

def test_select_regular_column_with_reused_dataframe_hidden_in_natural_join(self):
# A DataFrame appears both as a direct join side and inside a natural/USING
# join that hides one of its columns into `metadataOutput`. When resolving
# `dim["dim_id"]`, two candidates match the plan id: one from `p.output`
# (the direct join side) and one only visible via `p.metadataOutput` (the
# reused `dim` nested under the USING-join wrapper). We should prefer the
# regular candidate and not throw AMBIGUOUS_COLUMN_REFERENCE.
fact = self.spark.createDataFrame([(1, 10, "T1"), (2, 20, "T2")], ["id", "fk", "txn_id"])
dim = self.spark.createDataFrame([(10, "X"), (20, "Y"), (30, "Z")], ["dim_id", "dim_name"])
events = self.spark.createDataFrame(
[(10, "T1", 100), (20, "T2", 200)], ["dim_id", "txn_id", "amount"]
)
enriched = events.join(dim, "dim_id", "left")
result = (
fact.join(dim, fact["fk"] == dim["dim_id"], "left")
.join(enriched, "txn_id", "full_outer")
.select(dim["dim_id"])
)
self.assertEqual(result.count(), 2)

def test_drop_notexistent_col(self):
df1 = self.spark.createDataFrame(
[("a", "b", "c")],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,25 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
val planId = planIdOpt.get
logDebug(s"Extract plan_id $planId from $u")

val isMetadataAccess = u.containsTag(LogicalPlan.IS_METADATA_COL)

val (resolved, matched) = resolveDataFrameColumnByPlanId(
u, planId, isMetadataAccess, q, 0)
val (resolved, matched) = if (u.containsTag(LogicalPlan.IS_METADATA_COL)) {
// Metadata access (e.g. `df["_metadata"]`): the resolved attribute lives
// in `p.metadataOutput`, so filter ancestors by `p.metadataOutput`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment claims "the resolved attribute lives in p.metadataOutput," but getMetadataAttributeByNameOpt (LogicalPlan.scala:56) explicitly looks in (metadataOutput ++ output).collectFirst with the note "An already-referenced column might appear in output instead of metadataOutput." The resolved attribute can be in p.output, not p.metadataOutput.

This matters because the filter is now narrower than pre-SPARK-55070 (p.output ++ p.metadataOutput). At any ancestor that clears metadataOutput to Nil (Aggregate, Limit, Sort, Window, set ops — basicLogicalOperators.scala:404, 448, 510, 853, 885, 1228, 1513, 1573, 1684) but carries the metadata attribute through output, the strict-metadata filter would reject a candidate the previous code accepted. Is the narrowing intentional? If so, can you spell that out in the comment and add a test for the metadata-access path? The PR description doesn't mention this behavior change.

resolveDataFrameColumnByPlanId(
u, planId, true, q, 0, plan => AttributeSet(plan.metadataOutput))
} else {
// Regular access: try the strict `p.outputSet` filter first.
// That drops candidates hidden at an ancestor, e.g. the right side's join
// key after a natural/USING join. Fall back to `p.output ++ p.metadataOutput`
// only when strict resolves nothing, handling the SPARK-55070
// `rhs["join_key"]` case. Mirrors `outputAttributes.resolve orElse
// outputMetadataAttributes.resolve` in `LogicalPlan.resolve`.
resolveDataFrameColumnByPlanId(
u, planId, false, q, 0, plan => plan.outputSet) match {
case (Some(r), m) => (Some(r), m)
case _ => resolveDataFrameColumnByPlanId(u, planId, false, q, 0,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback re-walks the tree from scratch — re-descending, re-running p.resolve(u.nameParts, conf.resolver) at every matched node, and re-merging — only to swap the filter set. Resolution at matched nodes and the descent are identical between the two passes; only getAllowed(p) differs.

Consider collapsing to a single walk by exposing the two filter components per level (e.g. (p.outputSet, AttributeSet(p.metadataOutput))) and tracking pass-states on each candidate as it flows up. Concretely: drop getAllowed, return candidates as (NamedExpression, depth, passesStrict); at every ancestor, use r.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput)) as the survival gate (matches today's broad filter) and AND-in r.references.subsetOf(p.outputSet) to update passesStrict. At the top of resolveDataFrameColumn, prefer the passesStrict subset and fall back to all survivors. That preserves the foldLeft merge and the strict-then-broad precedence, but pays one walk instead of two.

Not a blocker — just feels like the two passes are doing the same descent twice when the only difference is the filter.

plan => AttributeSet(plan.output ++ plan.metadataOutput))
}
}
if (!matched) {
// Can not find the target plan node with plan id, e.g.
// df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
Expand All @@ -546,9 +561,11 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
id: Long,
isMetadataAccess: Boolean,
q: Seq[LogicalPlan],
currentDepth: Int): (Option[(NamedExpression, Int)], Boolean) = {
currentDepth: Int,
getAllowed: LogicalPlan => AttributeSet
): (Option[(NamedExpression, Int)], Boolean) = {
val resolved = q.map(resolveDataFrameColumnRecursively(
u, id, isMetadataAccess, _, currentDepth))
u, id, isMetadataAccess, _, currentDepth, getAllowed))
val merged = resolved
.flatMap(_._1)
.sortBy(_._2) // sort by depth
Expand All @@ -566,7 +583,9 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
id: Long,
isMetadataAccess: Boolean,
p: LogicalPlan,
currentDepth: Int): (Option[(NamedExpression, Int)], Boolean) = {
currentDepth: Int,
getAllowed: LogicalPlan => AttributeSet
): (Option[(NamedExpression, Int)], Boolean) = {
val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
val resolved = if (!isMetadataAccess) {
p.resolve(u.nameParts, conf.resolver)
Expand All @@ -585,7 +604,8 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
case _: Union => Seq.empty[LogicalPlan]
case _ => p.children
}
resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, children, currentDepth + 1)
resolveDataFrameColumnByPlanId(
u, id, isMetadataAccess, children, currentDepth + 1, getAllowed)
}

// In self join case like:
Expand Down Expand Up @@ -619,10 +639,7 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
// In this case, resolveDataFrameColumnByPlanId returns None,
// the dataframe column 'df.id' will remain unresolved, and the analyzer
// will try to resolve 'id' without plan id later.
val filtered = resolved.filter { r =>
// A DataFrame column can be resolved as a metadata column, we should keep it.
r._1.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
}
val filtered = resolved.filter { case (r, _) => r.references.subsetOf(getAllowed(p)) }
(filtered, matched)
}

Expand Down