-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19691][SQL] Fix ClassCastException when calculating percentile of decimal column #17028
Conversation
@@ -138,7 +138,8 @@ case class Percentile( | |||
override def update( | |||
buffer: OpenHashMap[Number, Long], | |||
input: InternalRow): OpenHashMap[Number, Long] = { | |||
val key = child.eval(input).asInstanceOf[Number] | |||
val scalaValue = CatalystTypeConverters.convertToScala(child.eval(input), child.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to open up the signature of the OpenHashMap
and use Ordered
or AnyRef
as its key type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll fix that way. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create a converter and re-use it with createToScalaConverter(...)
rather than type-dispatching every time maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure though, it seems the cost of converting Decimal
to BigDecimal
every-time is some higher than that of using catalyst values as it is.
Test build #73280 has finished for PR 17028 at commit
|
Just a sec, I'll apply the @hvanhovell suggestion... |
Test build #73290 has finished for PR 17028 at commit
|
Test build #73320 has finished for PR 17028 at commit
|
@HyukjinKwon @hvanhovell How about the latest fix? |
val frqValue = frequencyExpression.eval(input) | ||
|
||
// Null values are ignored in counts map. | ||
if (key != null && frqValue != null) { | ||
val frqLong = frqValue.asInstanceOf[Number].longValue() | ||
val frqLong = toLongValue(frqValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
frqValue
is guaranteed to return a integral value. So this is not needed. We could also force it to be a Long, that would make this even simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revert this part.
@@ -274,7 +283,8 @@ case class Percentile( | |||
val row = new UnsafeRow(2) | |||
row.pointTo(bs, sizeOfNextRow) | |||
// Insert the pairs into counts map. | |||
val key = row.get(0, child.dataType).asInstanceOf[Number] | |||
val catalystValue = row.get(0, child.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Just change the cast in the old code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh..., I'll fix
assert(compareEquals(agg.deserialize(agg.serialize(buffer)), buffer)) | ||
|
||
// Check non-empty buffer serializa and deserialize. | ||
data.foreach { key => | ||
buffer.changeValue(key, 1L, _ + 1L) | ||
buffer.changeValue(new Integer(key), 1L, _ + 1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To we need to explicitly type this? I thoughtscala boxed automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this boxing does not exist, it throws an exception below;
[error] /Users/maropu/IdeaProjects/spark/spark-master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Perc
entileSuite.scala:46: the result type of an implicit conversion must be more specific than AnyRef
} | ||
assert(compareEquals(agg.deserialize(agg.serialize(buffer)), buffer)) | ||
} | ||
|
||
test("class Percentile, high level interface, update, merge, eval...") { | ||
val count = 10000 | ||
val percentages = Seq(0, 0.25, 0.5, 0.75, 1) | ||
val expectedPercentiles = Seq(1, 2500.75, 5000.5, 7500.25, 10000) | ||
val expectedPercentiles = Seq[Double](1, 2500.75, 5000.5, 7500.25, 10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to type this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since my Intellij makes an alert on this part, I added this. But, you're right and this is not necessary. I reverted this.
@maropu This looks pretty good. I left a few minor comments/questions. |
Thanks for your review! I'm fixing now. |
Done. I wait for tests finished. |
Test build #73342 has finished for PR 17028 at commit
|
LGTM - merging to master. |
Thanks! |
@maropu can you open a backport if you feel we should also put this in 2.1? |
@hvanhovell okay, I'll open soon. |
… of decimal column ## What changes were proposed in this pull request? This pr fixed a class-cast exception below; ``` scala> spark.range(10).selectExpr("cast (id as decimal) as x").selectExpr("percentile(x, 0.5)").collect() java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.lang.Number at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:141) at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:58) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:514) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109) at ``` This fix simply converts catalyst values (i.e., `Decimal`) into scala ones by using `CatalystTypeConverters`. ## How was this patch tested? Added a test in `DataFrameSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes apache#17028 from maropu/SPARK-19691.
What changes were proposed in this pull request?
This pr fixed a class-cast exception below;
This fix simply converts catalyst values (i.e.,
Decimal
) into scala ones by usingCatalystTypeConverters
.How was this patch tested?
Added a test in
DataFrameSuite
.