Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19118] [SQL] Percentile support for frequency distribution table #16497

Closed
wants to merge 3 commits into from

Conversation

tanejagagan
Copy link

What changes were proposed in this pull request?

I have a frequency distribution table with following entries
Age, No of person
21, 10
22, 15
23, 18
..
..
30, 14
Moreover it is common to have data in frequency distribution format to further calculate Percentile, Median. With current implementation
It would be very difficult and complex to find the percentile.
Therefore i am proposing enhancement to current Percentile and Approx Percentile implementation to take frequency distribution column into consideration

How was this patch tested?

  1. Enhanced /sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala to cover the additional functionality
  2. Run some performance benchmark test with 20 million row in local environment and did not see any performance degradation

Please review http://spark.apache.org/contributing.html before opening a pull request.

@tanejagagan
Copy link
Author

@hvanhovell
Can you please review the changes

@hvanhovell
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jan 8, 2017

Test build #71033 has finished for PR 16497 at commit 28cdb66.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good start. I left a few comments.

case _ => Seq(NumericType, DoubleType)
}
} else {
percentageExpression.dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not needed to make a difference between freq = 1 and the freq > 1 cases. Just use the more complete case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like I would have to do it based on what were the argument in the SQL and which constructor was invoked
for sql without Frequency like percentile( a, 0.5 ) children need to be child:: percentageExpr::Nil
for sql with Frequency like percentile( a, frq, 0.5 ) children need to be child::frequency::percentageExpr::Nil
As well as input type should be reflected on how many arguments were passed

Both InputDataType Check and generated sql tests fails

However i have made changes in the logic to determine which constructor was invoked and decide inputType and children based on it

@@ -81,7 +97,11 @@ case class Percentile(
case arrayData: ArrayData => arrayData.toDoubleArray().toSeq
}

override def children: Seq[Expression] = child :: percentageExpression :: Nil
override def children: Seq[Expression] = if( frequency != unit){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment.

override def prettyName: String = "percentile"

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Percentile =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): Percentile =
copy(inputAggBufferOffset = newInputAggBufferOffset)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this. This will probably fail style checks.

Copy link
Author

@tanejagagan tanejagagan Jan 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not realize that I had forgotten to do the mvn build which does the style check as well

if (key != null && frqValue != null) {
val frqLong = frqValue.asInstanceOf[Number].longValue()
//add only when frequency is positive
if(frqLong > 0 ){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit add a space between if and its clause

@hvanhovell
Copy link
Contributor

cc @jiangxb1987

@SparkQA
Copy link

SparkQA commented Jan 9, 2017

Test build #71047 has finished for PR 16497 at commit b1d421b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 9, 2017

Test build #71051 has finished for PR 16497 at commit 947c8cf.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 9, 2017

Test build #71052 has finished for PR 16497 at commit 6be7c9a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

@tanejagagan Would you please revert the unnecessary code changes?

BTW: You can test the scala style on local environment by running ./dev/lint-scala from console.

@SparkQA
Copy link

SparkQA commented Jan 10, 2017

Test build #71094 has finished for PR 16497 at commit 12a6559.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is promising, I left a few comments, will review this again after they are addressed.


_FUNC_(col, frequency, array(percentage1 [, percentage2]...)) - Returns the exact percentile
value array of numeric column `col` with frequency column `frequency` at the given
percentage(s).Each value of the percentage array must be between 0.0 and 1.0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge the comments with the exsiting one.

percentageExpression: Expression,
withFrqExpr : Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the flag withFrqExpr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the type of frequency to be Option[Expression], and remove the withFrqExpr flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove withFrqExpr. The frequency must be provided, and should default to 1L.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment below why we need to make a distinction either using flag or Option

I am inclined towards using a flag because switching to option would change the code in update
from
val frqValue = frequency.eval(input)
to
val frqValue = frequency.getOrElse( unit).eval(input)

But i think Option[Expression] would be better logically
Once we have an agreement if we need to have a distinction or not i will make the changes accordingly

case class Percentile(
child: Expression,
frequency : Expression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to frequencyExpression, and place this after percentageExpression.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think frequenctExpression would be correct name but i have sequence them as they would appear in the SQL
select percentile( col, frq, percentage ) from table
where frq is Optional

child :: frequency :: percentageExpression :: Nil
} else {
child :: percentageExpression :: Nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

override def children: Seq[Expression] = {
    val frequency = if (frequencyExpression.isDefined) {
        frequencyExpression :: Nil
    } else {
        Nil
    }
    child :: percentageExpression :: frequency
}

case _ => DoubleType
}
if (withFrqExpr) {
Seq(NumericType, IntegralType, percentageExpType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also change this according to the comments above.

@@ -126,10 +152,15 @@ case class Percentile(
buffer: OpenHashMap[Number, Long],
input: InternalRow): OpenHashMap[Number, Long] = {
val key = child.eval(input).asInstanceOf[Number]
val frqValue = frequency.eval(input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the value of the frequencyExpression beforehand.


// Null values are ignored in counts map.
if (key != null) {
buffer.changeValue(key, 1L, _ + 1L)
if (key != null && frqValue != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dangerous, we should use a default value (e.g. frqValue = 1) when frqValue is invalid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should default to 1L as a value. That seems more wrong than either failing or skipping.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this would be wrong to use the default value of 1
Let take a data set of
Age, Count
20, 1
15, 1
10, 0

If we take the default value of 1L when the frq is 0 is then .5 percentile would become 15 . This is incorrect. I agree with other suggestion of either failing or disregard those values

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remove withFrqExpr and all related code, just assume that frequency is always there.

percentageExpression: Expression,
withFrqExpr : Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove withFrqExpr. The frequency must be provided, and should default to 1L.

@@ -81,7 +96,11 @@ case class Percentile(
case arrayData: ArrayData => arrayData.toDoubleArray().toSeq
}

override def children: Seq[Expression] = child :: percentageExpression :: Nil
override def children: Seq[Expression] = if (withFrqExpr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make a difference here?

Just do:

override def children: Seq[Expression] = {
  child :: frequency :: percentageExpression :: Nil
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have given lot of thought to it and if we need to make a difference here.
Lets take a data set of
age_string, age, count
"20", 20, 1
"15", 15, 1
"10", 10, 1

For Sql "select percentile( age, count , 0.5 ) from table"
logically correct values should be
children = age::count ::0.5 :: Nil
and
inputType = IntegerType :: IntegerType::DoubleType::Nil

For sql "select pecentile( age, 0.5 ) from table"
logically correct values should be
children = age::0.5 :: Nil
and
inputType = IntegerType ::DoubleType::Nil

Here is one example where keeping it logically correct would help
For following incorrect SQL "select percentile( age, '10') from table"

With children = age::'10'::Nil and inputType = IntergerType::StringType:: Nil
Since both children and inputType is used for dataType validation, the error message would be correct as below.
"argument 2 requires Double type, however, 10 is of String type."

However With children = age::Literal(1)::'10'::Nil and inputType = IntergerType::IntegerType::StringType:: Nil
The error message would be NOT correct and confusing as below
"argument 3 requires Double type, however, 10 is of String type."

Since both children and dataType are public method i was inclined to keep them explicitly correct and therefore i decided to make a difference.
Please let me know your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the param sequence to: child, percentageExpression, frequencyExpression, and give a default value to frequencyExpression.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell
Can you provide your final thoughts on this and i will make the changes accordingly
i did try to find an expression where middle argument is Optional but could not. Seems like only the last argument(s) are optional.
Which argument sequence would you recommend
Column [, frequency], percentages
OR
Column, percentages [, frequency]


// Null values are ignored in counts map.
if (key != null) {
buffer.changeValue(key, 1L, _ + 1L)
if (key != null && frqValue != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should default to 1L as a value. That seems more wrong than either failing or skipping.

if (key != null && frqValue != null) {
val frqLong = frqValue.asInstanceOf[Number].longValue()
// add only when frequency is positive
if (frqLong > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make this a requirement and fail when the value < 0.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the option was between either fail or disregard those values. We can certainly make this a requirement, document and fail when the values are negatives
I think for the cases where values are either null or 0 we should not be adding them to Map to unnecessary
bloat the map.
The logic would look like
if ( frqLong < 0 ) {
throw new SomeException
}else if( frqLong > 0 ) {
// process to add them to map
}

Let me know if above look good and i will make the changes accordingly

case _: ArrayType => ArrayType(DoubleType)
case _ => DoubleType
}
if (withFrqExpr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again remove withFrqExpr.

@holdenk
Copy link
Contributor

holdenk commented Jan 26, 2017

So the latest changes seems to be in a bad state (544 files changed), can you maybe update your branch?

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72032 has finished for PR 16497 at commit 130b724.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

gagan taneja added 2 commits January 26, 2017 22:02
Now optional frequency column is moved to the end of the function
Fixing scala style errors
Removing a duplicate test
@SparkQA
Copy link

SparkQA commented Jan 27, 2017

Test build #72079 has finished for PR 16497 at commit 5397beb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Author

@tanejagagan tanejagagan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell
Can you review the changes. I have incorporated all the suggested changes

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is in pretty good shape. I have left some comments on the tests. Fix those and we should be ready to merge this.

// add only when frequency is positive
if (frqLong > 0) {
buffer.changeValue(key, frqLong, _ + frqLong)
} else if ( frqLong < 0 ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: no leading and trailing spaces in the if clause

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -26,6 +27,7 @@ import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashMap


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line

@@ -84,6 +86,59 @@ class PercentileSuite extends SparkFunSuite {
}
}

test("class Percentile with frequency, high level interface, update, merge, eval...") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is almost a duplicate of the previous test. Lets just merge the two, and have a case without a frequency. I do like the parameterization.

@@ -119,35 +174,73 @@ class PercentileSuite extends SparkFunSuite {
test("call from sql query") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this entire test. We don't have to check the sql method. This feels like over testing.

@@ -232,6 +358,41 @@ class PercentileSuite extends SparkFunSuite {
assert(agg.eval(buffer) != null)
}

test("null and invalid values( 0 and negatives ) handling of frequency column") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make this test about testing the negative frequency case & move null & 0 testing to the earlier test. This should be enough:

val perc = new Percentile(Literal(1), Literal(0.5), Literal(-1))
val buffer = new GenericInternalRow(2)
agg.initialize(buffer)
val e = intercept[SparkException](agg.update(buffer, InternalRow.empty))
assert(e.getMessage.contains("Negative values found in "))

Removed the test for Sql Query
Merged 2 tests as they were similar to each other
Regrouped null and invalid values related test
@SparkQA
Copy link

SparkQA commented Feb 7, 2017

Test build #72491 has started for PR 16497 at commit 58e7a63.

@hvanhovell
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 7, 2017

Test build #72493 has finished for PR 16497 at commit 58e7a63.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

LGTM - merging to master. Thanks!

@asfgit asfgit closed this in e99e34d Feb 7, 2017
@tanejagagan
Copy link
Author

Thanks to you for including the changes

cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

I have a frequency distribution table with following entries
Age,    No of person
21, 10
22, 15
23, 18
..
..
30, 14
Moreover it is common to have data in frequency distribution format to further calculate Percentile, Median. With current implementation
It would be very difficult and complex to find the percentile.
Therefore i am proposing enhancement to current Percentile and Approx Percentile implementation to take frequency distribution column into consideration

## How was this patch tested?
1) Enhanced /sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala to cover the additional functionality
2) Run some performance benchmark test with 20 million row in local environment and did not see any performance degradation

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: gagan taneja <tanejagagan@gagans-MacBook-Pro.local>

Closes apache#16497 from tanejagagan/branch-18940.
given percentage. The value of percentage must be between 0.0 and 1.0.
_FUNC_(col, percentage [, frequency]) - Returns the exact percentile value of numeric column
`col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The
value of frequency should be positive integral

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring should mention what the frequency parameter is, how it's used, etc. Just stating that it should be a positive integer is not useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants