Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37802][SQL] Composite field name should work with Aggregate push down #35108

Closed
wants to merge 6 commits into from

Conversation

huaxingao
Copy link
Contributor

What changes were proposed in this pull request?

Currently, composite filed name such as dept id doesn't work with aggregate push down

sql("SELECT COUNT(`dept id`) FROM h2.test.dept")

org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input 'id' expecting <EOF>(line 1, pos 5)

== SQL ==
dept id
-----^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:271)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:132)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:63)
	at org.apache.spark.sql.connector.expressions.LogicalExpressions$.parseReference(expressions.scala:39)
	at org.apache.spark.sql.connector.expressions.FieldReference$.apply(expressions.scala:365)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.translateAggregate(DataSourceStrategy.scala:717)
	at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.$anonfun$pushAggregates$1(PushDownUtils.scala:125)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushAggregates(PushDownUtils.scala:125)

This PR fixes the problem.

Why are the changes needed?

bug fixing

Does this PR introduce any user-facing change?

No

How was this patch tested?

New test

@huaxingao
Copy link
Contributor Author

@cloud-fan Could you please take a look? Thanks!

@Dintion
Copy link

Dintion commented Jan 6, 2022

and Chinese filed name has same problem

== SQL ==
缺陷编号
^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:265)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:126)

@huaxingao
Copy link
Contributor Author

Should be good now @Dintion

@@ -349,7 +349,7 @@ private[sql] final case class FieldReference(parts: Seq[String]) extends NamedRe

private[sql] object FieldReference {
def apply(column: String): NamedReference = {
LogicalExpressions.parseReference(column)
LogicalExpressions.parseReference("`" + column + "`")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to fix the caller side. We shouldn't call FieldReference.apply(String) which parses the given string. We should call FieldReference(Seq(col_name)).

@@ -706,21 +706,21 @@ object DataSourceStrategy
if (agg.filter.isEmpty) {
agg.aggregateFunction match {
case aggregate.Min(PushableColumnWithoutNestedColumn(name)) =>
Some(new Min(FieldReference(name)))
Some(new Min(FieldReference(s"`$name`")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know it's a top-level column and it's a waste to parse it again. The column name may contain backtick as well and we need to escape it.

A simpler solution is to skip the parsing: FieldReference(Seq(name)). We can even create an util method for it: FieldReference.column(name)

@beliefer
Copy link
Contributor

beliefer commented Jan 6, 2022

@huaxingao Could you wait #35101 merged and update with FieldReference.column(name) ?

@Dintion
Copy link

Dintion commented Jan 6, 2022

@huaxingao I think the code at org.apache.spark.sql.execution.datasources.v2.PushDownUtils#pushAggregates#columnAsString

 def columnAsString(e: Expression): Option[FieldReference] = e match {
      case PushableColumnWithoutNestedColumn(name) =>
        Some(FieldReference(name).asInstanceOf[FieldReference])
      case _ => None
    }```

 also exist same problem

@@ -351,6 +351,10 @@ private[sql] object FieldReference {
def apply(column: String): NamedReference = {
LogicalExpressions.parseReference(column)
}

def column(name: String) : NamedReference = {
FieldReference(Seq(name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your work

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in cf193b9 Jan 7, 2022
@huaxingao
Copy link
Contributor Author

Thank you all!

@huaxingao huaxingao deleted the composite_name branch January 7, 2022 05:47
dchvn pushed a commit to dchvn/spark that referenced this pull request Jan 19, 2022
…sh down

### What changes were proposed in this pull request?
Currently, composite filed name such as dept id doesn't work with aggregate push down

sql("SELECT COUNT(\`dept id\`) FROM h2.test.dept")
```
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input 'id' expecting <EOF>(line 1, pos 5)

== SQL ==
dept id
-----^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:271)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:132)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:63)
	at org.apache.spark.sql.connector.expressions.LogicalExpressions$.parseReference(expressions.scala:39)
	at org.apache.spark.sql.connector.expressions.FieldReference$.apply(expressions.scala:365)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.translateAggregate(DataSourceStrategy.scala:717)
	at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.$anonfun$pushAggregates$1(PushDownUtils.scala:125)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushAggregates(PushDownUtils.scala:125)
```
This PR fixes the problem.

### Why are the changes needed?
bug fixing

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New test

Closes apache#35108 from huaxingao/composite_name.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants