Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4233] [SQL] WIP:Simplify the UDAF API (Interface) #3247

Closed
wants to merge 4 commits into from

Conversation

chenghao-intel
Copy link
Contributor

Simplify the UDAF API is the first step of optimization for Aggregation (see https://issues.apache.org/jira/browse/SPARK-4366).

Currently UDAF cannot scale up when data volume grows, particularly for the distinct aggregation expressions. This PR doesn't aim for fixing the distinct performance, but facilitate

  • More straightforward API for UDAF implementation
    UDAF Developers will not write the distinct expression any more, like DistinctAverage is not necessary, as Average is provided, the framework will handle the distinct internally.
  • Row-based Aggregation Buffer
    Aggregation Buffer will be stored as MutableRow with schema, it means the UDAF developers will benefit from the Catalyst Expression Evaluation framework in UDAF development.
  • Aggregation buffers shuffling cross the machine boundary is transparently from the UDAF developers, that will give us a chance to switch / tuning the aggregation algorithms without changing the UDAF interface.

The following sub tasks have been done:

  • Integrated with Hive UDAF (Generic UDAF)
  • Re-implemented the existed UDAF (e.g. max, first, last etc.)
  • Code style issues.

Still some open issues need to be done with follow-up PRs:

  • Approximate aggregations
  • Partial aggregation for DISTINCT (before the data shuffling)
  • CodeGen for aggregation function (This PR has removed all of the aggregation codegen)
  • Sort-based aggregation (to reduce the overhead of memory footprint)
  • Aggregation optimization when Data Skew .

Data Gen Code:

import org.apache.spark.sql.hive.HiveContext
case class Source(key: String, value: Long)
    val sc = new SparkContext("local[6]", "TestSQLContext", new SparkConf().set("spark.shuffle.spill", "true"))
    val hc = new HiveContext(sc)

    val constant = 500000
    val data = sc.parallelize(1 to 75, 75).mapPartitions { i => {
        val rand = new java.util.Random(System.currentTimeMillis())
        new Iterator[Source] {
          var count = constant
          def hasNext = count >= 0
          def next(): Source = {
            count -= 1
            Source("Key" + rand.nextLong(), rand.nextLong())
          }
        }
      }
    }
    hc.createSchemaRDD(data).saveAsParquetFile("/tmp/source")

And the benchmark code and result:

import org.apache.spark.sql.hive.HiveContext

case class Source(key: String, value: Long)
val hc = new HiveContext(sc)
hc.parquetFile("/tmp/source").registerTempTable("source")
hc.sql("set spark.sql.shuffle.partitions=120")
hc.sql("set spark.sql.codegen=true")
hc.sql("create table IF NOT EXISTS result (c1 double, c2 double, c3 double, c4 double, c5 double)")

val q1 = "insert overwrite table result select count(value), max(value), min(value), sum(value), avg(value) from source"
val q2 = "insert overwrite table result select count(value), max(value), min(value), sum(value), avg(value) from source group by key"
val q3 = "insert overwrite table result select count(distinct value), max(value), min(value), sum(distinct value), avg(value) from source"
val q4 = "insert overwrite table result select count(distinct value), max(value), min(value), sum(distinct value), avg(value) from source group by key"

case class QueryResult(query: String, durations: Seq[Long], physicalPlan: Seq[String]) {
  override def toString() = s"""Query:$query \n Duration: ($durations)\n ${physicalPlan.mkString("\n")}"""
}
val results = new collection.mutable.ArrayBuffer[QueryResult]()
try {
  (q1 :: q2 :: q3 :: q4 :: Nil).foreach(q => {
        val physicalPlan = hc.sql(s"explain $q").collect().map(_.getString(0))
        val durations = (0 to 2).map { i =>
          val a = System.currentTimeMillis()
          hc.sql(q)
          val b = System.currentTimeMillis()
          b - a
        }
        val qr = QueryResult(q, durations, physicalPlan)
        results.append(qr)
        println(qr)
  })
} catch {
  case t: Throwable => t.printStackTrace()
} finally {
  results.foreach(println)
}

CMD: bin/spark-shell --master local[6] --jars conf/hive-site.xml --driver-memory 10g

SQL New Implementation Old Implementation Old Implementation with CodeGen
Q1 (7412, 5548, 5432) (6571, 4188, 3848) (7821, 8637, 3894)
Q2 (287442, 205844, 283307) (430723, 399582, 450138) (409049, 382874, 380012)
Q3 (253209, 267187, 283325) (81634, 109788, 117197) (111527, 108582, 119733)
Q4 (363017, 550359, 319450) (1909499, 1428532, 1654106) (1336548, 1405936, 1253961)

CMD: bin/spark-shell --master local[6] --jars conf/hive-site.xml --driver-memory 5g

SQL New Implementation Old Implementation Old Implementation with CodeGen
Q1 (8419, 5608, 5400) (6632, 4339, 4166) (7324, 4068, 3990)
Q2 (308409, 263482, 306832) OOM OOM
Q3 (350264, 346999, 371698) OOM OOM
Q4 (331502, 397524, 572660) OOM OOM

In general, the new implementation has better in memory usage, and faster while group by keys specified, in the meantime, we have room to optimize the non-group by key cases for the new implementation, which is supposed to have no impact with the UDAF interface.

PS: the old implementation seems doesn't support the query like select avg(distinct value) from xxx

@SparkQA
Copy link

SparkQA commented Nov 13, 2014

Test build #23312 has finished for PR 3247 at commit bb79ea8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow
    • abstract class AggregateFunction
    • trait AggregateExpression extends Expression
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class Min(child: Expression, distinct: Boolean = false, override val distinctLike: Boolean = true) extends UnaryExpression with AggregateExpression
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average) extends AggregateFunction
    • case class Average(child: Expression, distinct: Boolean = false) extends UnaryExpression with AggregateExpression
    • case class Max(child: Expression) extends UnaryExpression with AggregateExpression
    • case class MaxFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction
    • case class Count(child: Expression) extends UnaryExpression with AggregateExpression
    • case class CountDistinct(expressions: Seq[Expression]) extends UnaryExpression with AggregateExpression
    • case class CollectHashSet(expressions: Seq[Expression]) extends UnaryExpression with AggregateExpression
    • case class CombineSetsAndCount(inputSet: Expression) extends UnaryExpression with AggregateExpression
    • case class ApproxCountDistinctPartition(child: Expression, relativeSD: Double) extends UnaryExpression with AggregateExpression
    • case class ApproxCountDistinctMerge(child: Expression, relativeSD: Double) extends UnaryExpression with AggregateExpression
    • case class ApproxCountDistinct(child: Expression, relativeSD: Double = 0.05) extends UnaryExpression with AggregateExpression
    • case class Sum(child: Expression) extends UnaryExpression with AggregateExpression
    • case class SumDistinct(child: Expression) extends UnaryExpression with AggregateExpression
    • case class First(child: Expression) extends UnaryExpression with AggregateExpression
    • case class Last(child: Expression) extends UnaryExpression with AggregateExpression
    • sealed case class AggregateFunctionBind(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@chenghao-intel
Copy link
Contributor Author

@rxin, @marmbrus, most of ideas are from the Hive/Shark, can you please go thru the code and give some feedbacks? Sorry, it's a BIG commit, but I really want to merge all of the aggregation improvement(totally 3 - 4 BIG PRs) in next release.

(Most of the tricks happens in Aggregate.scala and aggregates.scala)

@marmbrus
Copy link
Contributor

@chenghao-intel, I glanced at this really quickly and will take a closer look once we cut an RC for 1.2. Overall this is probably a good direction to go in.

@SparkQA
Copy link

SparkQA commented Nov 25, 2014

Test build #23833 has finished for PR 3247 at commit bb1eb2d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RandomForestModel(JavaModelWrapper):
    • class RandomForest(object):
    • case class UnresolvedFunction(
    • abstract class AggregateFunction
    • trait AggregateExpression extends Expression
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class Min(child: Expression, distinct: Boolean = false, override val distinctLike: Boolean = true) extends UnaryExpression with AggregateExpression
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average) extends AggregateFunction
    • case class Average(child: Expression, distinct: Boolean = false) extends UnaryExpression with AggregateExpression
    • case class Max(child: Expression) extends UnaryExpression with AggregateExpression
    • case class MaxFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction
    • case class Count(child: Expression) extends UnaryExpression with AggregateExpression
    • case class CountDistinct(expressions: Seq[Expression]) extends UnaryExpression with AggregateExpression
    • case class CollectHashSet(expressions: Seq[Expression]) extends UnaryExpression with AggregateExpression
    • case class CombineSetsAndCount(inputSet: Expression) extends UnaryExpression with AggregateExpression
    • case class ApproxCountDistinctPartition(child: Expression, relativeSD: Double) extends UnaryExpression with AggregateExpression
    • case class ApproxCountDistinctMerge(child: Expression, relativeSD: Double) extends UnaryExpression with AggregateExpression
    • case class ApproxCountDistinct(child: Expression, relativeSD: Double = 0.05) extends UnaryExpression with AggregateExpression
    • case class Sum(child: Expression) extends UnaryExpression with AggregateExpression
    • case class SumDistinct(child: Expression) extends UnaryExpression with AggregateExpression
    • case class First(child: Expression) extends UnaryExpression with AggregateExpression
    • case class Last(child: Expression) extends UnaryExpression with AggregateExpression
    • sealed case class AggregateFunctionBind(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
    • class DefaultSource extends RelationProvider
    • case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
    • abstract class CatalystScan extends BaseRelation

@SparkQA
Copy link

SparkQA commented Dec 9, 2014

Test build #24242 has finished for PR 3247 at commit a160d1a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression, distinct: Boolean = false)
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count) extends AggregateFunction
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • sealed case class AggregateFunctionBind(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Dec 10, 2014

Test build #24297 has finished for PR 3247 at commit a9c1544.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression, distinct: Boolean = false)
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count) extends AggregateFunction
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • sealed case class AggregateFunctionBind(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24526 has finished for PR 3247 at commit 6e548bc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count)
    • case class CountDistinctFunction(aggr: BoundReference, base: CountDistinct)
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • sealed case class AggregateFunctionBind(
    • sealed class KeyBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24941 has finished for PR 3247 at commit 7fe2a0e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count)
    • case class CountDistinctFunction(aggr: BoundReference, base: CountDistinct)
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • sealed case class AggregateFunctionBind(
    • sealed class KeyBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24950 has finished for PR 3247 at commit 93df2f4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class GaussianMixtureModel(
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count)
    • case class CountDistinctFunction(aggr: BoundReference, base: CountDistinct)
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • case class Sort(
    • sealed case class AggregateFunctionBind(
    • sealed class KeyBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
    • case class BroadcastLeftSemiJoinHash(

@chenghao-intel
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Jan 4, 2015

Test build #25020 has finished for PR 3247 at commit 93df2f4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count)
    • case class CountDistinctFunction(aggr: BoundReference, base: CountDistinct)
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • sealed case class AggregateFunctionBind(
    • sealed class KeyBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25043 has finished for PR 3247 at commit 74945d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count)
    • case class CountDistinctFunction(aggr: BoundReference, base: CountDistinct)
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • sealed case class AggregateFunctionBind(
    • sealed class KeyBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25047 has finished for PR 3247 at commit 230e70f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • case class MinFunction(aggr: BoundReference, base: Min) extends AggregateFunction
    • case class AverageFunction(count: BoundReference, sum: BoundReference, base: Average)
    • case class MaxFunction(aggr: BoundReference, base: Max) extends AggregateFunction
    • case class CountFunction(aggr: BoundReference, base: Count)
    • case class CountDistinctFunction(aggr: BoundReference, base: CountDistinct)
    • case class SumFunction(aggr: BoundReference, base: Sum) extends AggregateFunction
    • case class FirstFunction(aggr: BoundReference, base: First) extends AggregateFunction
    • case class LastFunction(aggr: BoundReference, base: AggregateExpression) extends AggregateFunction
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@chenghao-intel
Copy link
Contributor Author

@marmbrus , this PR passed the unit test, but some of details need to be discussed. Can you review this? Particularly for the UDAF interface design.

Sorry about so many code changes, as I almost rewrote all of the UDAF relevant code.

CountDistinct(args.map(nodeToExpr))
case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => Sum(nodeToExpr(arg), true)
case Token("TOK_FUNCTIONDI", Token(MAX(), Nil) :: arg :: Nil) => Max(nodeToExpr(arg), true)
case Token("TOK_FUNCTIONDI", Token(MIN(), Nil) :: arg :: Nil) => Min(nodeToExpr(arg), true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does MIN distinct mean?

@marmbrus
Copy link
Contributor

marmbrus commented Jan 6, 2015

I only looked at this quickly, but I like the goals, especially the middle one. Our current implementation is really wasteful on memory. Some thoughts:

  • It would be good if you could write up a quick design doc that outlines the interfaces as right now its kind of hard to pull them out from all the other changes you have to make.
  • I wonder if it is possible to combine aggregate expression and aggregate function somehow.
  • Can you explain how the modes are used. Do we really need them?

Other things:

  • Before we commit this we will have to implement the approximates. I don't think its okay to regress in functionality here.
  • I'm not totally against removing the code generated version, but I'd have to see some performance tests that show we aren't regressing.

@SparkQA
Copy link

SparkQA commented Jan 20, 2015

Test build #25783 has finished for PR 3247 at commit 2cae095.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #26013 has finished for PR 3247 at commit feb00c8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@chenghao-intel
Copy link
Contributor Author

@marmbrus I've rebased to the latest master, and also updated the benchmark result, Sorry, the interface has slight different than the design doc in jira, I will update that soon, but the general idea would be the same.

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #26014 has finished for PR 3247 at commit 3dc1572.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression, distinct: Boolean = false)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@chenghao-intel
Copy link
Contributor Author

Sorry, @maropu I'v updated. Let's see if will break anything.

@SparkQA
Copy link

SparkQA commented Mar 27, 2015

Test build #29286 has finished for PR 3247 at commit 0915435.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • //case class Add(left: Expression, right: Expression) extends BinaryArithmetic
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Mar 27, 2015

Test build #29287 has finished for PR 3247 at commit 70117c4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • //case class Add(left: Expression, right: Expression) extends BinaryArithmetic
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Mar 27, 2015

Test build #29289 has finished for PR 3247 at commit 40fed21.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Mar 27, 2015

Test build #29291 has finished for PR 3247 at commit 341e708.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Mar 27, 2015

Test build #29310 has finished for PR 3247 at commit b539baf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(

@SparkQA
Copy link

SparkQA commented Mar 31, 2015

Test build #29457 has finished for PR 3247 at commit 13f4f15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • trait AggregateFunction
    • trait AggregateExpression extends Expression with AggregateFunction
    • abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression
    • case class Min(
    • case class Average(child: Expression, distinct: Boolean = false)
    • case class Max(child: Expression)
    • case class Count(child: Expression)
    • case class CountDistinct(children: Seq[Expression])
    • case class Sum(child: Expression, distinct: Boolean = false)
    • case class First(child: Expression, distinct: Boolean = false)
    • case class Last(child: Expression, distinct: Boolean = false)
    • sealed case class AggregateFunctionBind(
    • sealed class InputBufferSeens(
    • sealed trait Aggregate
    • sealed trait PreShuffle extends Aggregate
    • sealed trait PostShuffle extends Aggregate
    • case class AggregatePreShuffle(
    • case class AggregatePostShuffle(
    • case class DistinctAggregate(
  • This patch does not change any dependencies.

@maropu
Copy link
Member

maropu commented Apr 2, 2015

@chenghao-intel I'm also with your refactoring idea though, it's too big to merge into the master in bulk.
ISTM this patch is better to split into some small ones.
e.g.)

  • a patch to remove DISTINCT aggregator expression
  • a patch to simplify buffering codes in Aggregate
  • a patch to improve hive integration (Mode, or something)
  • ...

Thought?
Anyway, I'm interested in your UDTF and UDAF refactoring activities, and so
I'd like to join that.

@chenghao-intel
Copy link
Contributor Author

@maropu Glad to know you're interested with the refactoring!
Ideally, we'd better create a branch for a big code change, and then we can break down the task into smaller ones like you described.
But, it's probably very difficult to merge them into master piece by piece, as those tasks have many dependencies with each other, and we also have the bottom line that we can't break anything of master tests.

@maropu I will be glad to merge/review your PRs if you make the change against my repo, or @marmbrus @rxin is it possible to create a branch for this PR in apache repo?

@maropu
Copy link
Member

maropu commented Apr 2, 2015

Is it not possible to create that simple patch that removes DISTINCT aggregation expressions?
We only add distinct as a field value in AggregateExpresion, and then
SUM/COUNT switches an aggregation strategy, DISTNCT or not, by using the current AggregationFunction implementations.

@chenghao-intel
Copy link
Contributor Author

OK, I got your mean, as I put into the description of this PR, we want to make a unified UDAF interface in this PR, DISTINCT is quite critical for this purpose, we don't want to change the API definition again and again, right?

@inline
final def getStruct(bound: BoundReference): Row = getStruct(bound.ordinal)
/* end of the syntactic sugar it as API */

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes must be needed for this patch?
The interfaces of Row are related to all the other operator.
I think that if necessary, you make a PR first to add these interfaces in Row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that.

@marmbrus
Copy link
Contributor

This PR is superseded by another PR and can be close, right?

@chenghao-intel
Copy link
Contributor Author

Yea, I will close this for #5542

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants