New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42481][CONNECT] Implement agg.{max,min,mean,count,avg,sum} #40070
Conversation
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
Outdated
Show resolved
Hide resolved
I need to update golden files in this PR. |
.setFunctionName("avg") | ||
.addArguments(inputExpr) | ||
.setIsDistinct(false) | ||
functions.avg(columnName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You currently don't return this function, but the result of builder.build(). If you do, it should be functions.avg(columnName).expr
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I did right replacement and hit a proto -> plan test generation failure.
I am planing look into that separately. I am gonna need some time to learn how to debug org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite
case name => | ||
builder.getUnresolvedFunctionBuilder | ||
.setFunctionName(name) | ||
.addArguments(inputExpr) | ||
.addArguments(df(columnName).expr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Column.fn
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hold on that I will revert this part. It seems hit an issue somewhere if I switch to use functions API. I need to understand more on the functions implementation.
I will debug this separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my earlier comment, and also the tests are broken.
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the tests and fix strToExpr
. Looks good otherwise.
* @since 3.4.0 | ||
*/ | ||
def count(): Long = { | ||
groupBy().count().collect().head.getLong(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't I implement that?
lol...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my local branch that I rebased today, there is no this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well you are right... I thought I added it.
*/ | ||
@scala.annotation.varargs | ||
def mean(colNames: String*): DataFrame = { | ||
toDF(colNames.map(colName => functions.mean(colName)).toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need toSeq
here? I though scala varags are always a Seq...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm I see. Removing those toSeq
.
@@ -109,7 +109,7 @@ class RelationalGroupedDataset protected[sql] ( | |||
agg(exprs.asScala.toMap) | |||
} | |||
|
|||
private[this] def strToExpr(expr: String, inputExpr: proto.Expression): proto.Expression = { | |||
private[this] def strToColumn(expr: String, inputExpr: proto.Expression): Column = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
private[this] def strToColumn(expr: String, inputExpr: Column): Column = {
expr.toLowerCase(Locale.ROOT) match {
case "avg" | "average" | "mean" => functions.avg(inputExpr)
case "stddev" | "std" => functions.avg(inputExpr)
case "count" | "size" => functions.count(inputExpr) // Analyzer will take care of * expansion
case name => Column.fn(name, inputExpr)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you are suggesting now. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One small comment.
Merging. |
### What changes were proposed in this pull request? Adding more API to `agg` including max,min,mean,count,avg,sum. ### Why are the changes needed? API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #40070 from amaliujia/rw-agg2. Authored-by: Rui Wang <rui.wang@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com> (cherry picked from commit 74f53b8) Signed-off-by: Herman van Hovell <herman@databricks.com>
### What changes were proposed in this pull request? Adding more API to `agg` including max,min,mean,count,avg,sum. ### Why are the changes needed? API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes apache#40070 from amaliujia/rw-agg2. Authored-by: Rui Wang <rui.wang@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com> (cherry picked from commit 74f53b8) Signed-off-by: Herman van Hovell <herman@databricks.com>
What changes were proposed in this pull request?
Adding more API to
agg
including max,min,mean,count,avg,sum.Why are the changes needed?
API coverage
Does this PR introduce any user-facing change?
NO
How was this patch tested?
UT