Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27296][SQL] Allows Aggregator to be registered as a UDF #25024

Closed
wants to merge 29 commits into from

Conversation

erikerlandson
Copy link
Contributor

@erikerlandson erikerlandson commented Jul 2, 2019

What changes were proposed in this pull request?

Defines a new subclass of UDF: UserDefinedAggregator. Also allows Aggregator to be registered as a udf. Under the hood, the implementation is based on the internal TypedImperativeAggregate class that spark's predefined aggregators make use of. The effect is that custom user defined aggregators are now serialized only on partition boundaries instead of being serialized and deserialized at each input row.

The two new modes of using Aggregator are as follows:

val agg: Aggregator[IN, BUF, OUT] = // typed aggregator
val udaf1 = UserDefinedAggregator(agg)
val udaf2 = spark.udf.register("agg", agg)

How was this patch tested?

Unit testing has been added that corresponds to the testing suites for UserDefinedAggregateFunction. Additionally, unit tests explicitly count the number of aggregator ser/de cycles to ensure that it is governed only by the number of data partitions.

To evaluate the performance impact, I did two comparisons.
The code and REPL results are recorded on this gist
To characterize its behavior I benchmarked both a relatively simple aggregator and then an aggregator with a complex structure (a t-digest).

performance

The following compares the new Aggregator based aggregation against UDAF. In this scenario, the new aggregation is about 100x faster. The difference in performance impact depends on the complexity of the aggregator. For very simple aggregators (e.g. implementing 'sum', etc), the performance impact is more like 25-30%.

scala> import scala.util.Random._, org.apache.spark.sql.Row, org.apache.spark.tdigest._
import scala.util.Random._
import org.apache.spark.sql.Row
import org.apache.spark.tdigest._

scala> val data = sc.parallelize(Vector.fill(50000){(nextInt(2), nextGaussian, nextGaussian.toFloat)}, 5).toDF("cat", "x1", "x2")
data: org.apache.spark.sql.DataFrame = [cat: int, x1: double ... 1 more field]

scala> val udaf = TDigestUDAF(0.5, 0)
udaf: org.apache.spark.tdigest.TDigestUDAF = TDigestUDAF(0.5,0)

scala> val bs = Benchmark.sample(10) { data.agg(udaf($"x1"), udaf($"x2")).first }
bs: Array[(Double, org.apache.spark.sql.Row)] = Array((16.523,[TDigestSQL(TDigest(0.5,0,130,TDigestMap(-4.9171836327285225 -> (1.0, 1.0), -3.9615949140987685 -> (1.0, 2.0), -3.792874086327091 -> (0.7500781537109753, 2.7500781537109753), -3.720534874164185 -> (1.796754196108008, 4.546832349818983), -3.702105588052377 -> (0.4531676501810167, 5.0), -3.665883591332569 -> (2.3434687534153142, 7.343468753415314), -3.649982231368131 -> (0.6565312465846858, 8.0), -3.5914188829817744 -> (4.0, 12.0), -3.530472305581248 -> (4.0, 16.0), -3.4060489584449467 -> (2.9372251939818383, 18.93722519398184), -3.3000694035428486 -> (8.12412890252889, 27.061354096510726), -3.2250016655261877 -> (8.30564453211017, 35.3669986286209), -3.180537395623448 -> (6.001782561137285, 41.3687811...

scala> bs.map(_._1)
res0: Array[Double] = Array(16.523, 17.138, 17.863, 17.801, 17.769, 17.786, 17.744, 17.8, 17.939, 17.854)

scala> val agg = TDigestAggregator(0.5, 0)
agg: org.apache.spark.tdigest.TDigestAggregator = TDigestAggregator(0.5,0)

scala> val udaa = spark.udf.register("tdigest", agg)
udaa: org.apache.spark.sql.expressions.UserDefinedAggregator[Double,org.apache.spark.tdigest.TDigestSQL,org.apache.spark.tdigest.TDigestSQL] = UserDefinedAggregator(TDigestAggregator(0.5,0),None,true,true)

scala> val bs = Benchmark.sample(10) { data.agg(udaa($"x1"), udaa($"x2")).first }
bs: Array[(Double, org.apache.spark.sql.Row)] = Array((0.313,[TDigestSQL(TDigest(0.5,0,130,TDigestMap(-4.9171836327285225 -> (1.0, 1.0), -3.9615949140987685 -> (1.0, 2.0), -3.792874086327091 -> (0.7500781537109753, 2.7500781537109753), -3.720534874164185 -> (1.796754196108008, 4.546832349818983), -3.702105588052377 -> (0.4531676501810167, 5.0), -3.665883591332569 -> (2.3434687534153142, 7.343468753415314), -3.649982231368131 -> (0.6565312465846858, 8.0), -3.5914188829817744 -> (4.0, 12.0), -3.530472305581248 -> (4.0, 16.0), -3.4060489584449467 -> (2.9372251939818383, 18.93722519398184), -3.3000694035428486 -> (8.12412890252889, 27.061354096510726), -3.2250016655261877 -> (8.30564453211017, 35.3669986286209), -3.180537395623448 -> (6.001782561137285, 41.36878118...

scala> bs.map(_._1)
res1: Array[Double] = Array(0.313, 0.193, 0.175, 0.185, 0.174, 0.176, 0.16, 0.186, 0.171, 0.179)

scala> 

@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107090 has finished for PR 25024 at commit 7718212.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@erikerlandson erikerlandson changed the title [SPARK-27296][SQL][WIP] User Defined Aggregators that do not ser/de on each input row [WIP][SPARK-27296][SQL] User Defined Aggregators that do not ser/de on each input row Jul 2, 2019
@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107124 has finished for PR 25024 at commit 26cc8df.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountSerDeSQL(nSer: Int, nDeSer: Int, sum: Double)

@SparkQA
Copy link

SparkQA commented Jul 3, 2019

Test build #107132 has finished for PR 25024 at commit c82d95e.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@erikerlandson erikerlandson changed the title [WIP][SPARK-27296][SQL] User Defined Aggregators that do not ser/de on each input row [SPARK-27296][SQL] User Defined Aggregators that do not ser/de on each input row Jul 6, 2019
@rxin
Copy link
Contributor

rxin commented Jul 6, 2019

Perf numbers?

@SparkQA
Copy link

SparkQA commented Jul 6, 2019

Test build #107294 has finished for PR 25024 at commit 63802cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@erikerlandson
Copy link
Contributor Author

@rxin I wrote up my benchmarking results above, under "how was this patch tested" For an aggregator with non-trivial ser/de, the improvement can be two orders of magnitude, but is less for more simple aggregators.

@rxin
Copy link
Contributor

rxin commented Jul 8, 2019

Can you look into how this actually works under the hood? The code is still doing serialization, so it doesn't really make sense to me that it is significantly faster vs the existing UDAF. I also find it very weird to have two UDAF interfaces that look very similar. Would be great if we can fix the old one.

@erikerlandson
Copy link
Contributor Author

erikerlandson commented Jul 8, 2019

@rxin the key difference is in the update methods. The standard UDAF requires that the aggregator be stored in a MutableAggregationBuffer and so a UDAF update method always has this basic form:

def update(buf: MutableAggregationBuffer, input: Row): Unit = {
  val agg = buf.getAs[AggregatorType](0)  // UDT deserializes the aggregator from 'buf'
  agg.update(input)    // update the state of your aggregation
  buf(0) = agg    // UDT re-serializes the aggregator back into buf
}

The consequence of this is that it is calling deserialize and (re)serialize for the actual aggregating structure for every single input row. If your dataframe has a million rows, it's doing ser/de on your aggregator a million times, not just at the end of each data partition.

Compare that with the UDAI (which is driven by TypedImperativeAggregate)

def update(agg: AggregatorType, input: Row): AggregatorType = {
  agg.update(input) // update the state of your aggregator from the input
  agg // return the aggregator
}

You can see that here, there is no ser/de of the aggregator at all, when processing input rows (which is as it should be). The TypedImperativeAggregate only invokes ser/de on the aggregator when it is collecting partial results across partitions (and at the end when it is presenting final results into the output data frame).

So, imagine a data-frame with 10 partitions and 1 million rows. The UDAF does ser/de on the aggregator a million (plus 10) times, while the UDIA does ser/de only 10 times.

@erikerlandson
Copy link
Contributor Author

With respect to "fixing UDAF" (instead of creating a new UDIA), I have convinced myself there is no path there, but here is where I went with that: as described above, the basic pattern of a UDAF update method is:

def update(buf: MutableAggregationBuffer, input: Row): Unit = {
  val agg = buf.getAs[AggregatorType](0)  // UDT deserializes the aggregator from 'buf'
  agg.update(input)    // update the state of your aggregation
  buf(0) = agg    // UDT re-serializes the aggregator back into buf
}

So, the problem arises out of the UDT, which does ser/de. In theory, IF you could just store agg directly into the buf, as a raw object reference, then this would not require any actual ser/de, and the UDAF would almost certainly be efficient.

However, if you try this trick (and I did), Spark will crash with an "unrecognized data type" exception, because it only allows defined subclasses of DataType to be stored in Rows. It also allows UDTs, but of course these are required to encode the user's custom type in terms of defined subclasses of DataType.

I do not think Spark/Catalyst can be made to cope with raw object references in Row objects, as it needs to know how to operate on whatever objects live in Rows. It requires a "closed universe" of possible DataTypes. Even if you disabled the enforcement and allowed arbitrary object references in Rows, it would break spark.

As an aside, spark arguably already has two parallel aggregator interfaces: UDAF and TypedImperativeAggregate (which is what all the predefined aggregators use). This PR is exposing that second one to users.

@erikerlandson
Copy link
Contributor Author

To elaborate on the 'raw object reference' above, what I specifically did was try using a DataType like ObjectType(classOf[TDigest]) in the mutable agg buffer schema.

That immediately fails here:

def externalDataTypeFor(dt: DataType): DataType = dt match {

For fun I tried defaulting that to "identity" for ObjectType, and it gets farther but then it fails way down in code generation:

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 37, Column 24: No applicable constructor/method found for actual parameters "int, org.isarnproject.sketches.TDigest"

So that is a flavor of catalyst's problem with handling anything outside its defined universe of data types.

@erikerlandson
Copy link
Contributor Author

With respect to the redundancy of UDAF, and UDIA, I would actually propose a deprecation path for the existing UDAF. The alternative proposed on this PR has behavior parity.

It could use a story for Java, presumably something like UDIA[AnyRef], or UDIA[Object]

When using a final presentation type using a UDT, it ought to work via pyspark exactly the same as UDAF.

@erikerlandson
Copy link
Contributor Author

I'd like to merge this, unless people have additional concerns or questions.
cc @rxin @hvanhovell

@rxin
Copy link
Contributor

rxin commented Jul 24, 2019

Thanks. Will take a look at it this week.

@SparkQA
Copy link

SparkQA commented Sep 28, 2019

Test build #111540 has finished for PR 25024 at commit 72795a9.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ScalaAggregator(

@erikerlandson
Copy link
Contributor Author

cc @rxin @cloud-fan @hvanhovell
In this latest push I added a proof-of-concept solution based on adding a Column generating method (apply) to Aggregator[IN, BUF, OUT]. It has some pros and cons relative to my previous UserDefinedImperativeAggregator (UDIA), which is still also in this PR.

pros:

  • does not add a new aggregating class
  • has comparable efficiency to UDIA (only does ser/de on partition boundaries)
  • I have shown it can work with user defined types, as demonstrated in the (temporary) file tdigest.scala

cons:

  • can only aggregate over a single value in a row, unlike UDAF and UDIA. For example, this kind of aggregation on multiple columns of the input row is not possible for an Aggregator based solution.
  • Aggregator does not seem to have a concept of specifying whether an aggregation is deterministic or not, it assumes all aggregations are deterministic. This seems wrong to me, and either way is different than how UDAF and UDIA work.
  • The processing of input rows is less flexible. For example, if an Aggregator with type IN as Double is declared, it will fail on a column of integer values. This is not necessarily true for UDAF and UDIA, if the input row values are read in the right way. There may be a way to add input casting, but I do not currently see it.

In summary, doing this with enhancements to Aggregator is definitely feasible, however I do not think it can provide total feature parity with UDAF or UDIA.

@cloud-fan
Copy link
Contributor

What I want to see is a single UDAF API that works for all the use cases, if it's possible. It's really confusing to end-users if there are a lof of UDAF APIs in Spark and they don't know which one to use.

Since ser/de is inevitable (InternalRow <-> Row), it's always preferred to do the ser/de only on partition boundaries. AFAIK other requirements are:

  1. can operate on multiple columns
  2. can be deterministic or not
  3. can cast the input columns to desired types
  4. can be registered as SQL function
  5. can operate on Dataset
  6. ... (please add more if I missed something)

I think Aggregator[IN, BUF, OUT] is good enough to specify what the aggregating logic is. What we need to add is:

  1. an API to specify the input columns
  2. an API to specify the determinism
  3. a mechanism to cast input columns
  4. an API to register Aggregator as SQL function

My proposal is:

  1. add a new method in UDFRegistration
def register[IN: TypeTag, BUF: TypeTag, OUT: TypeTag](
    name: String, func: Aggregator[IN, BUF, OUT]): UserDefinedFunction
  1. add a new implementation of UserDefinedFunction
class AggregatorAsFunction[IN: TypeTag, BUF: TypeTag, OUT: TypeTag](
    aggregator: Aggregator[IN, BUF, OUT]) extends UserDefinedFunction {
  def apply(exprs: Column*): Column = {
    // create a special expression which is smilar to `ScalaUDAF`. It projects the input row
    // according to the given `exprs`, and convert the projected internal row to `IN` type object
    // via the encoder, and feed the `IN` type object to the `Aggregator`. The encoder can cast
    // the input columns to desired types, please see `ScalaReflection`.
  }

  // this simply sets the nondeterministic flag of `AggregatorAsFunction`, which will be passed to
  // the new expression metioned in `apply`
  def asNondeterministic ...
}

@erikerlandson
Copy link
Contributor Author

erikerlandson commented Oct 5, 2019

@cloud-fan I could get behind that as a unification.
The piece I'm ambivalent about is this one: "convert the projected internal row to IN type object via the encoder". What is the logic for constructing an encoder that will de-code some arbitrary projected InternalRow to type IN? I could imagine being able to convert some InternalRow to a tuple up front, but we don't know what IN is, in general. The programmer who defines the Aggregator might provide one, but that would imply adding some new features to Aggregator, like an encoder for IN, and an expected input-schema (like UDAF and IDIA have).

I'll look at ScalaReflection further to solve the input encoding issue.

@erikerlandson
Copy link
Contributor Author

@cloud-fan ok I think I can get there with deserializerForType. I may not get to it until week after next.

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112194 has finished for PR 25024 at commit 018a301.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ScalaAggregator[IN: TypeTag, BUF: TypeTag, OUT: TypeTag](
  • case class UserDefinedAggregator[IN: TypeTag, BUF: TypeTag, OUT: TypeTag](

@cloud-fan
Copy link
Contributor

looks pretty good except some minor comments, thanks for the great work!

@SparkQA
Copy link

SparkQA commented Jan 6, 2020

Test build #116183 has finished for PR 25024 at commit 986a3b4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@erikerlandson
Copy link
Contributor Author

@cloud-fan the ScalaAggregator is significantly leaner now, thanks for all of your input!

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty good now!

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116252 has finished for PR 25024 at commit eb95998.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@erikerlandson
Copy link
Contributor Author

@cloud-fan I'm beginning to feel like this is ready to merge, unless you have additional feedback. What do you think?

@cloud-fan
Copy link
Contributor

@rdblue do you have anything to add? if not I'll merge it this week.

@rdblue
Copy link
Contributor

rdblue commented Jan 9, 2020

Looks good to me. Thanks to both of you for all of the hard work on this feature!

@cloud-fan
Copy link
Contributor

thanks, merging to master!

with ImplicitCastInputTypes
with Logging {

private[this] lazy val inputEncoder = inputEncoderNR.resolveAndBind()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the problem. We shouldn't keep the encoder unresolved in the query plan, and resolve it in the executor side. We can follow ResolveEncodersInUDF: add a rule to resolve the encoders in ScalaAggregator at driver side.

cc @viirya @dongjoon-hyun

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this defers resolving encoder to executors, we should resolve it on the driver.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @cloud-fan .

cloud-fan pushed a commit that referenced this pull request Jul 9, 2020
… and UnresolvedMapObjects

Context: The fix for SPARK-27296 introduced by #25024 allows `Aggregator` objects to appear in queries. This works fine for aggregators with atomic input types, e.g. `Aggregator[Double, _, _]`.

However it can cause a null pointer exception if the input type is `Array[_]`.  This was historically considered an ignorable case for serialization of `UnresolvedMapObjects`, but the new ScalaAggregator class causes these expressions to be serialized over to executors because the resolve-and-bind is being deferred.

### What changes were proposed in this pull request?
A new rule `ResolveEncodersInScalaAgg` that performs the resolution of the expressions contained in the encoders so that properly resolved expressions are serialized over to executors.

### Why are the changes needed?
Applying an aggregator of the form `Aggregator[Array[_], _, _]` using `functions.udaf()` currently causes a null pointer error in Catalyst.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
A unit test has been added that does aggregation with array types for input, buffer, and output. I have done additional testing with my own custom aggregators in the spark REPL.

Closes #28983 from erikerlandson/fix-spark-32159.

Authored-by: Erik Erlandson <eerlands@redhat.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 1cb5bfc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jul 9, 2020
… and UnresolvedMapObjects

Context: The fix for SPARK-27296 introduced by #25024 allows `Aggregator` objects to appear in queries. This works fine for aggregators with atomic input types, e.g. `Aggregator[Double, _, _]`.

However it can cause a null pointer exception if the input type is `Array[_]`.  This was historically considered an ignorable case for serialization of `UnresolvedMapObjects`, but the new ScalaAggregator class causes these expressions to be serialized over to executors because the resolve-and-bind is being deferred.

### What changes were proposed in this pull request?
A new rule `ResolveEncodersInScalaAgg` that performs the resolution of the expressions contained in the encoders so that properly resolved expressions are serialized over to executors.

### Why are the changes needed?
Applying an aggregator of the form `Aggregator[Array[_], _, _]` using `functions.udaf()` currently causes a null pointer error in Catalyst.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
A unit test has been added that does aggregation with array types for input, buffer, and output. I have done additional testing with my own custom aggregators in the spark REPL.

Closes #28983 from erikerlandson/fix-spark-32159.

Authored-by: Erik Erlandson <eerlands@redhat.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants