Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45509][SQL] Fix df column reference behavior for Spark Connect #43465

Closed
wants to merge 5 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This PR fixes a few problems of column resolution for Spark Connect, to make the behavior closer to classic Spark SQL (unfortunately we still have some behavior differences in corner cases).

  1. resolve df column references in both resolveExpressionByPlanChildren and resolveExpressionByPlanOutput. Previously it's only in resolveExpressionByPlanChildren.
  2. when the plan id has multiple matches, fail with AMBIGUOUS_COLUMN_REFERENCE

Why are the changes needed?

fix behavior differences between spark connect and classic spark sql

Does this PR introduce any user-facing change?

Yes, for spark connect scala client

How was this patch tested?

new tests

Was this patch authored or co-authored using generative AI tooling?

no


val e2 = intercept[AnalysisException] {
// df1("i") is ambiguous as df1 appears in both join sides.
df1.join(df1, df1("i") === 1).collect()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classic spark sql thinks this is not ambiguous. It's probably a bug and I'll fix later.

checkSameResult(
Seq(Row("a")),
// df1_filter("i") is not ambiguous as df1_filter does not exist in the join left side.
df1.join(df1_filter, df1_filter("i") === 1).select(df1_filter("j"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Classic Spark SQL thinks this is ambiguous, as it uses AttributeReference directly and we are not able to re-resolve it. Spark Connect uses UnresolvedAttribute which is lazy binding and works fine in this case.

@cloud-fan
Copy link
Contributor Author

cc @zhengruifeng @hvanhovell @HyukjinKwon

I think the Spark Connect behavior is very reasonable now. We should move Classic Spark SQL to this behavior as well in the future.

@@ -539,4 +533,28 @@ trait ColumnResolutionHelper extends Logging {
None
}
}

private def findPlanById(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do tailrec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't find a way to make it tailrec... I need to check the return value from plan.children.flatMap(findPlanById(u, id, _))

// 1. extract the attached plan id from UnresolvedAttribute;
// 2. top-down traverse the query plan to find the plan node that matches the plan id;
// 3. if can not find the matching node, fail the analysis due to illegal references;
// 4. if more than one matching nodes are found, fail due to ambiguous column reference;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, does it mean that it fails when more than one matching nodes are found within the same level children?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

within the sub-plan under the plan node that holds the column reference

@@ -31,6 +31,15 @@
],
"sqlState" : "42702"
},
"AMBIGUOUS_COLUMN_REFERENCE" : {
"message" : [
"Column <name> is ambiguous. It's because you joined several DataFrame together, and some of these DataFrames are the same.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse AMBIGUOUS_REFERENCE (42704)?

and I guess another cause maybe dataframe creation with duplicated column names:

scala> val df = Seq((1,2), (3,4)).toDF("a","a")
val df: org.apache.spark.sql.DataFrame = [a: int, a: int]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's different. This is for found more than one matching dataframes, but AMBIGUOUS_REFERENCE is for found more than one matching attribute from candidates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

@github-actions github-actions bot added the DOCS label Oct 23, 2023
self._func = function._build_common_inline_user_defined_function(*cols)
# The function takes entire DataFrame as inputs, no need to do
# column binding (no input columns).
self._func = function._build_common_inline_user_defined_function()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems this field is not used in connect

private def transformCoGroupMap(rel: proto.CoGroupMap): LogicalPlan = {
val commonUdf = rel.getFunc
commonUdf.getFunctionCase match {
case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
transformTypedCoGroupMap(rel, commonUdf)
case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
val pythonUdf = transformPythonUDF(commonUdf)
val inputCols =
rel.getInputGroupingExpressionsList.asScala.toSeq.map(expr =>
Column(transformExpression(expr)))
val otherCols =
rel.getOtherGroupingExpressionsList.asScala.toSeq.map(expr =>
Column(transformExpression(expr)))
val input = Dataset
.ofRows(session, transformRelation(rel.getInput))
.groupBy(inputCols: _*)
val other = Dataset
.ofRows(session, transformRelation(rel.getOther))
.groupBy(otherCols: _*)
input.flatMapCoGroupsInPandas(other, pythonUdf).logicalPlan
case _ =>
throw InvalidPlanInput(
s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
}
}

cc @xinrong-meng

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, that's why this bug is hidden. We simply picked the first matching dataframe in the query plan to resolve columns, which is wrong and now we will throw AMBIGUOUS_COLUMN_REFERENCE

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you re-trigger the failed catalyst test pipeline?

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Nov 7, 2023

thanks for the reviews, merging to master!

@cloud-fan cloud-fan closed this in 8c41629 Nov 7, 2023
cloud-fan added a commit to cloud-fan/spark that referenced this pull request Nov 7, 2023
This PR fixes a few problems of column resolution for Spark Connect, to make the behavior closer to classic Spark SQL (unfortunately we still have some behavior differences in corner cases).
1. resolve df column references in both `resolveExpressionByPlanChildren` and `resolveExpressionByPlanOutput`. Previously it's only in `resolveExpressionByPlanChildren`.
2. when the plan id has multiple matches, fail with `AMBIGUOUS_COLUMN_REFERENCE`

fix behavior differences between spark connect and classic spark sql

Yes, for spark connect scala client

new tests

no

Closes apache#43465 from cloud-fan/column.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor Author

The 3.5 backport PR: #43699

zhengruifeng pushed a commit that referenced this pull request Nov 8, 2023
…nnect

backport #43465 to 3.5

### What changes were proposed in this pull request?

This PR fixes a few problems of column resolution for Spark Connect, to make the behavior closer to classic Spark SQL (unfortunately we still have some behavior differences in corner cases).
1. resolve df column references in both `resolveExpressionByPlanChildren` and `resolveExpressionByPlanOutput`. Previously it's only in `resolveExpressionByPlanChildren`.
2. when the plan id has multiple matches, fail with `AMBIGUOUS_COLUMN_REFERENCE`

### Why are the changes needed?

fix behavior differences between spark connect and classic spark sql

### Does this PR introduce _any_ user-facing change?

Yes, for spark connect scala client

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43699 from cloud-fan/backport.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants