Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26353][SQL]Add typed aggregate functions(max/min) to the example module. #23304

Closed
wants to merge 1 commit into from

Conversation

10110346
Copy link
Contributor

@10110346 10110346 commented Dec 13, 2018

What changes were proposed in this pull request?

Add typed aggregate functions(max/min) to the example module.

How was this patch tested?

Manual testing:
running typed minimum:

+-----+----------------------+
|value|TypedMin(scala.Tuple2)|
+-----+----------------------+
|    0|                 [0.0]|
|    2|                 [2.0]|
|    1|                 [1.0]|
+-----+----------------------+

running typed maximum:

+-----+----------------------+
|value|TypedMax(scala.Tuple2)|
+-----+----------------------+
|    0|                  [18]|
|    2|                  [17]|
|    1|                  [19]|
+-----+----------------------+

@10110346 10110346 changed the title [SPARK-26353][SQL]Add typed aggregate functions:max&&min [SPARK-26353][SQL]Add typed aggregate functions: max&&min Dec 13, 2018
@SparkQA
Copy link

SparkQA commented Dec 13, 2018

Test build #100056 has finished for PR 23304 at commit defee03.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TypedMaxDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMaxLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]
  • class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]

@10110346
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 13, 2018

Test build #100071 has finished for PR 23304 at commit defee03.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TypedMaxDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMaxLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]
  • class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]

@10110346
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 13, 2018

Test build #100084 has finished for PR 23304 at commit defee03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TypedMaxDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMaxLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]
  • class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]

@10110346
Copy link
Contributor Author

cc @rxin @cloud-fan

@gatorsmile
Copy link
Member

See the PR #18113

@10110346
Copy link
Contributor Author

This PR #18113 has a better solution, thanks @gatorsmile
But it looks like it hasn't been maintained.

@10110346
Copy link
Contributor Author

This PR #18113 doesn't look like making progress for a long time so far, I take this PR over.

@SparkQA
Copy link

SparkQA commented Dec 14, 2018

Test build #100136 has finished for PR 23304 at commit defee03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TypedMaxDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMaxLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]
  • class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
  • class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]

@SparkQA
Copy link

SparkQA commented Dec 14, 2018

Test build #100139 has finished for PR 23304 at commit 0b64ae7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait TypedMinDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMinDouble[IN](override val f: IN => Double)
  • class ScalaTypedMinDouble[IN](override val f: IN => Double)
  • trait TypedMaxDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMaxDouble[IN](override val f: IN => Double)
  • class ScalaTypedMaxDouble[IN](override val f: IN => Double)
  • trait TypedMinLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, java.lang.Long]
  • class ScalaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, Option[Long]]
  • trait TypedMaxLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, java.lang.Long]
  • class ScalaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

@cloud-fan
Copy link
Contributor

do you resolve #18113 (comment) ?

@10110346
Copy link
Contributor Author

@cloud-fan No, I don't

@HyukjinKwon
Copy link
Member

?? then are you going to resolve that comment?

@10110346
Copy link
Contributor Author

10110346 commented Jan 7, 2019

?? then are you going to resolve that comment?

I can't solve this problem at present, someone can help me?

@cloud-fan
Copy link
Contributor

How did you fix the tests? #18113 couldn't pass tests because of that problem.

@10110346
Copy link
Contributor Author

10110346 commented Jan 7, 2019

How did you fix the tests? #18113 couldn't pass tests because of that problem.

I just duplicated his code, I didn't do anything else.

/**
* Min aggregate function for floating point (double) type.
*
* @since 2.4.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's 3.0.0 now

class JavaTypedMinDouble[IN](override val f: IN => Double)
extends TypedMinDouble[IN, java.lang.Double] {
override def outputEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]()
override def finish(reduction: MutableDouble): java.lang.Double = reduction.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if reduction is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will update

class JavaTypedMaxDouble[IN](override val f: IN => Double)
extends TypedMaxDouble[IN, java.lang.Double] {
override def outputEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]()
override def finish(reduction: MutableDouble): java.lang.Double = reduction.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


class JavaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, java.lang.Long] {
override def outputEncoder: Encoder[java.lang.Long] = ExpressionEncoder[java.lang.Long]()
override def finish(reduction: MutableLong): java.lang.Long = reduction.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


class JavaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, java.lang.Long] {
override def outputEncoder: Encoder[java.lang.Long] = ExpressionEncoder[java.lang.Long]()
override def finish(reduction: MutableLong): java.lang.Long = reduction.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.min(v -> (double)v._2()));
Assert.assertEquals(
Arrays.asList(new Tuple2<>("a", 1.0), new Tuple2<>("b", 3.0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will update

("b", Some(-4.0), Some(-4L), Some(4.0), Some(4L)))
}

test("typed aggregate: empty") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test empty input with global aggregate in the java test as well?

@SparkQA
Copy link

SparkQA commented Jan 14, 2019

Test build #101163 has finished for PR 23304 at commit ae2152b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait TypedMinDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMinDouble[IN](override val f: IN => Double)
  • class ScalaTypedMinDouble[IN](override val f: IN => Double)
  • trait TypedMaxDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMaxDouble[IN](override val f: IN => Double)
  • class ScalaTypedMaxDouble[IN](override val f: IN => Double)
  • trait TypedMinLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, java.lang.Long]
  • class ScalaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, Option[Long]]
  • trait TypedMaxLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, java.lang.Long]
  • class ScalaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

@10110346
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 14, 2019

Test build #101170 has finished for PR 23304 at commit ae2152b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait TypedMinDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMinDouble[IN](override val f: IN => Double)
  • class ScalaTypedMinDouble[IN](override val f: IN => Double)
  • trait TypedMaxDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMaxDouble[IN](override val f: IN => Double)
  • class ScalaTypedMaxDouble[IN](override val f: IN => Double)
  • trait TypedMinLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, java.lang.Long]
  • class ScalaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, Option[Long]]
  • trait TypedMaxLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, java.lang.Long]
  • class ScalaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

}

/**
* Min aggregate function for floating point (double) type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Min -> Max. Actually I will elaborate it more. Please avoid abbreviation like min or max in doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,thanks

@HyukjinKwon
Copy link
Member

Looks fine to me too.

@SparkQA
Copy link

SparkQA commented Jan 15, 2019

Test build #101212 has finished for PR 23304 at commit 04c0a22.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait TypedMinDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMinDouble[IN](override val f: IN => Double)
  • class ScalaTypedMinDouble[IN](override val f: IN => Double)
  • trait TypedMaxDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMaxDouble[IN](override val f: IN => Double)
  • class ScalaTypedMaxDouble[IN](override val f: IN => Double)
  • trait TypedMinLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, java.lang.Long]
  • class ScalaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, Option[Long]]
  • trait TypedMaxLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, java.lang.Long]
  • class ScalaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 15, 2019

Test build #101219 has finished for PR 23304 at commit 04c0a22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait TypedMinDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMinDouble[IN](override val f: IN => Double)
  • class ScalaTypedMinDouble[IN](override val f: IN => Double)
  • trait TypedMaxDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT]
  • class JavaTypedMaxDouble[IN](override val f: IN => Double)
  • class ScalaTypedMaxDouble[IN](override val f: IN => Double)
  • trait TypedMinLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, java.lang.Long]
  • class ScalaTypedMinLong[IN](override val f: IN => Long) extends TypedMinLong[IN, Option[Long]]
  • trait TypedMaxLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT]
  • class JavaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, java.lang.Long]
  • class ScalaTypedMaxLong[IN](override val f: IN => Long) extends TypedMaxLong[IN, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

@rxin
Copy link
Contributor

rxin commented Jan 15, 2019

I'm kind of wondering whether it'd make sense to add these. It adds a lot of code which would incur some maintenance cost, and users can trivially implement these themselves, or just use the untyped version, and we don't need to spend time discussing whether these should follow SQL semantics or Scala semantics.

@HyukjinKwon
Copy link
Member

Actually, WDYT about ..

  1. Deprecating or removing them? Looks they are experimental.
  2. Stopping adding this from now on (but get this in as the last case considering min/max/count are pretty common).

Was having the kind of similar impression.

@HeartSaVioR
Copy link
Contributor

IMHO it would be ideal to have common aggregate functions for typed version as well (agree to restrict them to like sum/avg/count/min/max), otherwise end users using typed API have to implement such basic thing on their own.

Btw, TODOs in source code are tend to being considered as things to contribute. Might be ideal to remove these false alarm once we don't see its needs.

@cloud-fan
Copy link
Contributor

Hi @10110346 , can you move these new functions to the "example" package?

@10110346
Copy link
Contributor Author

Hi @10110346 , can you move these new functions to the "example" package?

Ok, I will do it, thanks

@10110346 10110346 changed the title [SPARK-26353][SQL]Add typed aggregate functions: max&&min [SPARK-26353][SQL]Add typed aggregate functions(max/min) to the example module. Feb 15, 2019
@SparkQA
Copy link

SparkQA commented Feb 15, 2019

Test build #102382 has finished for PR 23304 at commit bbb820b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TypedMin[IN](val f: IN => Double) extends Aggregator[IN, MutableDouble, Option[Double]]
  • class TypedMax[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

@@ -44,6 +45,12 @@ object SimpleTypedAggregator {
println("running typed average:")
ds.groupByKey(_._1).agg(new TypedAverage[(Long, Long)](_._2.toDouble).toColumn).show()

println("running typed Minimum:")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minimum -> minimum to be consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

@SparkQA
Copy link

SparkQA commented Feb 15, 2019

Test build #102385 has finished for PR 23304 at commit 20aef3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TypedMin[IN](val f: IN => Double) extends Aggregator[IN, MutableDouble, Option[Double]]
  • class TypedMax[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

}

override def bufferEncoder: Encoder[MutableDouble] = Encoders.kryo[MutableDouble]
override def outputEncoder: Encoder[Option[Double]] = ExpressionEncoder[Option[Double]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use Encoders.product[Option[Double]]? Or we can use Encoders.DOUBLE and use null to represent None.

The thing is, ExpressionEncoder is an internal class and we should avoid exposing it in the example module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

@SparkQA
Copy link

SparkQA commented Feb 18, 2019

Test build #102440 has finished for PR 23304 at commit 26a448d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TypedMin[IN](val f: IN => Double) extends Aggregator[IN, MutableDouble, Option[Double]]
  • class TypedMax[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, Option[Long]]
  • class MutableLong(var value: Long) extends Serializable
  • class MutableDouble(var value: Double) extends Serializable

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@10110346
Copy link
Contributor Author

@cloud-fan It seems merging has not succeeded, thanks

@HyukjinKwon
Copy link
Member

Merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants