Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19727][SQL] Fix for round function that modifies original column #17075

Closed
wants to merge 7 commits into from

Conversation

wojtek-szymanski
Copy link
Contributor

@wojtek-szymanski wojtek-szymanski commented Feb 27, 2017

What changes were proposed in this pull request?

Fix for SQL round function that modifies original column when underlying data frame is created from a local product.

import org.apache.spark.sql.functions._

case class NumericRow(value: BigDecimal)

val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789"))))

df.show()
+--------------------+
|               value|
+--------------------+
|1.234567890000000000|
+--------------------+

df.withColumn("value_rounded", round('value)).show()

// before
+--------------------+-------------+
|               value|value_rounded|
+--------------------+-------------+
|1.000000000000000000|            1|
+--------------------+-------------+

// after
+--------------------+-------------+
|               value|value_rounded|
+--------------------+-------------+
|1.234567890000000000|            1|
+--------------------+-------------+

How was this patch tested?

New unit test added to existing suite org.apache.spark.sql.MathFunctionsSuite

@srowen
Copy link
Member

srowen commented Feb 27, 2017

I dont' know the code well enough to really evaluate this, but, I see that .clone() is called in a similar context in decimalExpressions. There are also similar usages of changePrecision in UnsafeArrayWriter and UnsafeRowWriter; I wonder if they are affected too?

CC maybe @cloud-fan or @yjshen ?

@cloud-fan
Copy link
Contributor

I think we should fix changePrecison to return a new instance instead of updating itself.

@wojtek-szymanski
Copy link
Contributor Author

Good idea @cloud-fan. I will look for usages of changePrecision then.

@wojtek-szymanski
Copy link
Contributor Author

I have just started refactoring of changePrecission in order to make it immutable.
My idea was to change the signature from:
def changePrecision(precision: Int, scale: Int, mode: Int): Boolean
into
def changePrecision(precision: Int, scale: Int, mode: Int): Option[Decimal]

Here are my first thoughts:

  • org.apache.spark.sql.types.Decimal is mutable by definition, so making one method immutable makes its contract very inconsistent

  • I am afraid of performance degradation in micro-benchmarks since in some use cases, an instance needs to be created twice

  • changePrecission is called 10 times in Scala, 10 times in code gen functions and 3 times in Java unsafe writers (UnsafeArrayWriter, UnsafeRowWriter)

I would be grateful if you could confirm if it's the right way to go.

@cloud-fan
Copy link
Contributor

how about we add a new method toPrecision that returns Option[Decimal]? Most of the time we should call toPrecision, but for some performance critical path we should call changePrecission

@wojtek-szymanski
Copy link
Contributor Author

It seems it makes more sense now, please have a look.


private[this] def castToDecimal(from: DataType, target: DecimalType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, s => try {
changePrecision(Decimal(new JavaBigDecimal(s.toString)), target)
toPrecision(Decimal(new JavaBigDecimal(s.toString)), target)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like here we don't need to create a new instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

case dt: DecimalType =>
b => changePrecision(b.asInstanceOf[Decimal].clone(), target)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the only case we need toPewcision

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, there is one more here:

case BooleanType =>
  buildCast[Boolean](_, b => toPrecision(if (b) Decimal.ONE else Decimal.ZERO, target))

Both, ONE and ZERO are singletons so changing precision on themselves is not a good idea.

*
* @return `Some(decimal)` if successful or `None` if overflow would occur
*/
private[sql] def toPrecision(precision: Int, scale: Int,
Copy link
Contributor

@cloud-fan cloud-fan Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the style should be

def xxx(
    para1: xxx,
    para2: xxx): T = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks

value.changePrecision(
DecimalType.bounded(precision - scale + 1, 0).precision, 0, ROUND_FLOOR)
value
toPrecision(DecimalType.bounded(precision - scale + 1, 0).precision, 0, ROUND_FLOOR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we assume toPrecision will always return Some here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, it should be Some. On the other hand if something goes wrong when setting new precision in floor or ceil, I would raise an internal error:

def floor: Decimal = if (scale == 0) this else {
  val newPrecision = DecimalType.bounded(precision - scale + 1, 0).precision
  toPrecision(newPrecision, 0, ROUND_FLOOR).getOrElse(
    throw new AnalysisException(s"Overflow when setting precision to $newPrecision"))
}

@@ -233,6 +233,18 @@ class MathFunctionsSuite extends QueryTest with SharedSQLContext {
)
}

test("round/bround with data frame from a local Seq of Product") {
val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("5.9"))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to create NumericRow, try Seq(BigDecimal("5.9")).toDF("value")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the problem occurs only when creating data frame from Product. Unable to reproduce the issue with Seq(BigDecimal("5.9")).toDF("value")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is weird, can you look into it? spark.createDataset(Seq(BigDecimal("5.9"))) should produce the same result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will try to investigate where is the difference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fundamental difference is in the underlying row type assigned to dataframe/dataset. Dataframe is based on GenericInternalRow, while Dataset uses UnsafeRow. During evaluation of round expression, method getDecimal is called on a row, see BoundAttribute.scala#L52. As a result GenericInternalRow returns just an element of an array, which points to the reference of the original column, see rows.scala#L200. Strategy used in UnsafeRow is completely different, so new decimal instance is created, see UnsafeRow.java#L399.
I hope it helps to explain why only dataframe is affected.

DecimalType.bounded(precision - scale + 1, 0).precision, 0, ROUND_FLOOR)
value
toPrecision(DecimalType.bounded(precision - scale + 1, 0).precision, 0, ROUND_FLOOR)
.getOrElse(clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might end up creating two copies of the object in worst case :

  • once in toPrecision
  • second time on this line

Old logic would guarantee a single copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, thanks. My suggestion is to raise an internal error if setting new precision in floor or ceil would fail.

DecimalType.bounded(precision - scale + 1, 0).precision, 0, ROUND_CEILING)
value
toPrecision(DecimalType.bounded(precision - scale + 1, 0).precision, 0, ROUND_CEILING)
.getOrElse(clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above

@@ -193,7 +193,7 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester {
assert(Decimal(Long.MaxValue, 100, 0).toUnscaledLong === Long.MaxValue)
}

test("changePrecision() on compact decimal should respect rounding mode") {
test("changePrecision/toPrecission on compact decimal should respect rounding mode") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo in toPrecission

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed

* @return `Some(decimal)` if successful or `None` if overflow would occur
*/
private[sql] def toPrecision(
precision: Int, scale: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks

)
checkAnswer(
df.withColumn("value_rounded", bround('value)),
Seq(Row(BigDecimal("5.9"), BigDecimal("6")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why test it twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bround function is also affected. Column value_rounded renamed to value_brounded

*
* @return `Some(decimal)` if successful or `None` if overflow would occur
*/
private[sql] def toPrecision(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

def xxx(
    para1: xxx,
    para2: xxx): XXX

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -422,3 +434,4 @@ class MathFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(df.selectExpr("positive(b)"), Row(-1))
}
}
case class NumericRow(value : BigDecimal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just use Tuple1 instead of creating this class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with Tuple1

* @return `Some(decimal)` if successful or `None` if overflow would occur
*/
private[sql] def toPrecision(
precision: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 spaces indention here. please take a look at other methods in spark and follow the code style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I did, but I saw so many different styles and I had no idea which one is correct. Thanks again for your patience

@cloud-fan
Copy link
Contributor

LGTM, pending tests

@wojtek-szymanski
Copy link
Contributor Author

@cloud-fan could you please give the green light to tests?

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

sorry forgot the trigger the test...

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74135 has finished for PR 17075 at commit fc0f2d1.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74152 has finished for PR 17075 at commit fc0f2d1.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74181 has finished for PR 17075 at commit fc0f2d1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74186 has started for PR 17075 at commit fc0f2d1.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74195 has finished for PR 17075 at commit fc0f2d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in e9e2c61 Mar 8, 2017
asfgit pushed a commit that referenced this pull request Oct 29, 2017
…ginal column

## What changes were proposed in this pull request?

This is a followup of #17075 , to fix the bug in codegen path.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19576 from cloud-fan/bug.

(cherry picked from commit 7fdacbc)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…ginal column

## What changes were proposed in this pull request?

This is a followup of apache#17075 , to fix the bug in codegen path.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#19576 from cloud-fan/bug.

(cherry picked from commit 7fdacbc)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants