New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25048][SQL] Pivoting by multiple columns in Scala/Java #22316
Conversation
* | ||
* {{{ | ||
* df | ||
* .groupBy($"year") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this line up
Yup I prefer this way |
Test build #95590 has finished for PR 22316 at commit
|
Test build #95592 has finished for PR 22316 at commit
|
@@ -406,6 +407,14 @@ class RelationalGroupedDataset protected[sql]( | |||
* df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") | |||
* }}} | |||
* | |||
* For pivoting by multiple columns, use the `struct` function to combine the columns and values: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the documentation states it's an overloaded version of the `pivot` method with `pivotColumn` of the `String` type.
, shall we move this contents to that method?
Also, I would document this, for instance,
From Spark 2.4.0, values can be literal columns, for instance, struct
. For pivoting by multiple columns, use the struct
function to combine the columns and values.
.groupBy($"sales.year") | ||
.pivot(struct(lower($"sales.course"), $"training")) | ||
.agg(sum($"sales.earnings")) | ||
.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this .collect()
to cactch the RuntimeException
? btw, IMHO AnalysisException
is better than RuntimeException
in this case? Can't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My changes don't throw the exception. It is thrown in the collect() :
spark/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
Line 385 in 41c2227
.collect() |
@maropu Do you propose to catch RuntimeException
and replace it by AnalysisException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried in your branch;
scala> df.show
+--------+--------------------+
|training| sales|
+--------+--------------------+
| Experts|[dotNET, 2012, 10...|
| Experts|[JAVA, 2012, 2000...|
| Dummies|[dotNet, 2012, 50...|
| Experts|[dotNET, 2013, 48...|
| Dummies|[Java, 2013, 3000...|
+--------+--------------------+
scala> df.groupBy($"sales.year").pivot(struct(lower($"sales.course"), $"training")).agg(sum($"sales.earnings"))
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [dotnet,Dummies]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I miss something?
No, you don't. The exception for sure is thrown inside of lit
because collect()
returns a complex value which cannot be "wrapped" by lit. This is exactly checked in the test which I added to show existing behavior.
btw, IMHO AnalysisException is better than RuntimeException in this case?
@maropu Could you explain, please, why do you think AnalysisException
is better for the error occurs in run-time?
Just in case, in the PR, I don't aim to change behavior of existing method: def pivot(pivotColumn: Column): RelationalGroupedDataset
. I believe it should be discussed separately regarding to needs for changing user visible behavior. The PR aims to improve def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset
to allow users to specify struct
literals in particular. Please, see the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think invalid queries basically throw `AnalysisException. But, yea, indeed, we'd better to keep the current behaivour. Thanks!
Test build #95631 has finished for PR 22316 at commit
|
@@ -416,7 +426,7 @@ class RelationalGroupedDataset protected[sql]( | |||
new RelationalGroupedDataset( | |||
df, | |||
groupingExprs, | |||
RelationalGroupedDataset.PivotType(pivotColumn.expr, values.map(Literal.apply))) | |||
RelationalGroupedDataset.PivotType(pivotColumn.expr, values.map(lit(_).expr))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about map(lit).map(_.expr)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't see any advantages of this. It is longer and slower.
@HyukjinKwon May I ask you to look at the PR. Is there anything which blocks the PR for now? |
Looks good but I wonder if all guys are happy with that involved in the previous PR. |
At least @gatorsmile and @cloud-fan, WDYT? |
Branch is cut out. Let's target 3.0.0 |
@@ -330,6 +331,15 @@ class RelationalGroupedDataset protected[sql]( | |||
* df.groupBy("year").pivot("course").sum("earnings") | |||
* }}} | |||
* | |||
* From Spark 2.4.0, values can be literal columns, for instance, struct. For pivoting by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's target 3.0.0 @MaxGekk.
Test build #95829 has finished for PR 22316 at commit
|
Seems fine to me. |
@gatorsmile Do you have any objections for this approach? |
@@ -416,7 +426,7 @@ class RelationalGroupedDataset protected[sql]( | |||
new RelationalGroupedDataset( | |||
df, | |||
groupingExprs, | |||
RelationalGroupedDataset.PivotType(pivotColumn.expr, values.map(Literal.apply))) | |||
RelationalGroupedDataset.PivotType(pivotColumn.expr, values.map(lit(_).expr))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk, just for doubly doubly sure, shell we Try(...).getOrElse(lit(...).expr)
? Looks at least there's one case of a potential behaviour change about scale and precision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks at least there's one case of a potential behaviour change about scale and precision.
Could you explain, please. Why do you expect some behavior change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we eventually call Literal.create
instead of Literal.apply
. I'm not sure if there is a behavior change though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from a quick look, seems Literal.create
is more powerful and should not have regressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true in general but specifically is decimal precision more correct?
LGTM otherwise |
@HyukjinKwon @maropu @jaceklaskowski Please, take a look at this PR one more time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the decimal precision and scale could be different from a cursory look. For instance,
spark/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Lines 100 to 128 in d749d03
/** | |
* Creates a [[Column]] of literal value. | |
* | |
* The passed in object is returned directly if it is already a [[Column]]. | |
* If the object is a Scala Symbol, it is converted into a [[Column]] also. | |
* Otherwise, a new [[Column]] is created to represent the literal value. | |
* | |
* @group normal_funcs | |
* @since 1.3.0 | |
*/ | |
def lit(literal: Any): Column = typedLit(literal) | |
/** | |
* Creates a [[Column]] of literal value. | |
* | |
* The passed in object is returned directly if it is already a [[Column]]. | |
* If the object is a Scala Symbol, it is converted into a [[Column]] also. | |
* Otherwise, a new [[Column]] is created to represent the literal value. | |
* The difference between this function and [[lit]] is that this function | |
* can handle parameterized scala types e.g.: List, Seq and Map. | |
* | |
* @group normal_funcs | |
* @since 2.2.0 | |
*/ | |
def typedLit[T : TypeTag](literal: T): Column = literal match { | |
case c: Column => c | |
case s: Symbol => new ColumnName(s.name) | |
case _ => Column(Literal.create(literal)) | |
} |
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
Lines 165 to 171 in f96a8bf
def create[T : TypeTag](v: T): Literal = Try { | |
val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] | |
val convert = CatalystTypeConverters.createToCatalystConverter(dataType) | |
Literal(convert(v), dataType) | |
}.getOrElse { | |
Literal(v) | |
} |
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
Lines 756 to 763 in 1fd59c1
case t if t <:< localTypeOf[BigDecimal] => Schema(DecimalType.SYSTEM_DEFAULT, nullable = true) | |
case t if t <:< localTypeOf[java.math.BigDecimal] => | |
Schema(DecimalType.SYSTEM_DEFAULT, nullable = true) | |
case t if t <:< localTypeOf[java.math.BigInteger] => | |
Schema(DecimalType.BigIntDecimal, nullable = true) | |
case t if t <:< localTypeOf[scala.math.BigInt] => | |
Schema(DecimalType.BigIntDecimal, nullable = true) | |
case t if t <:< localTypeOf[Decimal] => Schema(DecimalType.SYSTEM_DEFAULT, nullable = true) |
vs
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
Lines 62 to 65 in f96a8bf
case d: BigDecimal => Literal(Decimal(d), DecimalType.fromBigDecimal(d)) | |
case d: JavaBigDecimal => | |
Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale())) | |
case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), d.scale)) |
Would you mind if I ask to double check this one please?
@@ -330,6 +331,15 @@ class RelationalGroupedDataset protected[sql]( | |||
* df.groupBy("year").pivot("course").sum("earnings") | |||
* }}} | |||
* | |||
* From Spark 3.0.0, values can be literal columns, for instance, struct. For pivoting by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.0.0 => 2.5.0
Test build #96404 has finished for PR 22316 at commit
|
retest this please |
Test build #96409 has finished for PR 22316 at commit
|
jenkins, retest this, please |
Test build #96420 has finished for PR 22316 at commit
|
LGTM if the decimal precision concern from @HyukjinKwon is addressed. |
@HyukjinKwon Do you expect special tests for decimals? |
Can you just investigate if there's behaviour change about decimal precision? If there is, can you add a simple test if that's a better behaviour? If that's not a better behaviour, let's try-catch for now. |
Test build #96520 has finished for PR 22316 at commit
|
One safe change is to not use the |
@cloud-fan Thank you for the suggestion. I did it in this way. |
Test build #96759 has finished for PR 22316 at commit
|
* multiple columns, use the `struct` function to combine the columns and values: | ||
* | ||
* {{{ | ||
* df.groupBy($"year") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: $"year"
-> "year"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cannot be grouping by Column
type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can. just to match the examples with above except the difference. really not a big deal at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one nit
I'm merging this. Last change is comment change and lint / unidoc check passed. |
Merged to master. |
## What changes were proposed in this pull request? In the PR, I propose to extend implementation of existing method: ``` def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset ``` to support values of the struct type. This allows pivoting by multiple columns combined by `struct`: ``` trainingSales .groupBy($"sales.year") .pivot( pivotColumn = struct(lower($"sales.course"), $"training"), values = Seq( struct(lit("dotnet"), lit("Experts")), struct(lit("java"), lit("Dummies"))) ).agg(sum($"sales.earnings")) ``` ## How was this patch tested? Added a test for values specified via `struct` in Java and Scala. Closes apache#22316 from MaxGekk/pivoting-by-multiple-columns2. Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Co-authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
Test build #96800 has finished for PR 22316 at commit
|
## What changes were proposed in this pull request? In the PR, I propose to extend implementation of existing method: ``` def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset ``` to support values of the struct type. This allows pivoting by multiple columns combined by `struct`: ``` trainingSales .groupBy($"sales.year") .pivot( pivotColumn = struct(lower($"sales.course"), $"training"), values = Seq( struct(lit("dotnet"), lit("Experts")), struct(lit("java"), lit("Dummies"))) ).agg(sum($"sales.earnings")) ``` ## How was this patch tested? Added a test for values specified via `struct` in Java and Scala. Closes apache#22316 from MaxGekk/pivoting-by-multiple-columns2. Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Co-authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
Hi, is there a way to pivot multiple columns using PySpark as well? |
You can try: df.groupby(...).pivot(..., values=[F.struct(F.lit("..."))._jc]) for now. |
What changes were proposed in this pull request?
In the PR, I propose to extend implementation of existing method:
to support values of the struct type. This allows pivoting by multiple columns combined by
struct
:How was this patch tested?
Added a test for values specified via
struct
in Java and Scala.