Skip to content

[SPARK-18980][SQL] implement Aggregator with TypedImperativeAggregate#16383

Closed
cloud-fan wants to merge 2 commits intoapache:masterfrom
cloud-fan:aggregator
Closed

[SPARK-18980][SQL] implement Aggregator with TypedImperativeAggregate#16383
cloud-fan wants to merge 2 commits intoapache:masterfrom
cloud-fan:aggregator

Conversation

@cloud-fan
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan commented Dec 22, 2016

What changes were proposed in this pull request?

Currently we implement Aggregator with DeclarativeAggregate, which will serialize/deserialize the buffer object every time we process an input.

This PR implements Aggregator with TypedImperativeAggregate and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up.

For simple buffer object that doesn't need serialization, we still go with DeclarativeAggregate, to avoid performance regression.

How was this patch tested?

N/A

@cloud-fan
Copy link
Copy Markdown
Contributor Author

cc @yhuai @hvanhovell @liancheng

RDD sum 1913 / 1942 52.3 19.1 1.0X
DataFrame sum 46 / 61 2157.7 0.5 41.3X
Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of master branch:

[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
[info] Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
[info]
[info] aggregate:                               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------
[info] RDD sum                                       1887 / 1898         53.0          18.9       1.0X
[info] DataFrame sum                                   46 /   60       2152.2           0.5      40.6X
[info] Dataset sum using Aggregator                  4549 / 4579         22.0          45.5       0.4X
[info] Dataset complex Aggregator                  12885 / 13830          7.8         128.9       0.1X

You can see that, for complex aggregator, we got about 2 times speed up, without performance regression on simple aggregator

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this benchmark uses only one key in aggregation, it should run hash based aggregation by ObjectHashAggregateExec. When the number of key is large, it will fall back to sort based aggregation, I think it should be a more common use pattern, can we still see such performance improvement?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hash-based or sort-based only decides how we "group" the records, while this PR speed up the "aggregating" part.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but sort based will add extra cost on sorting, especially an external one. With TypedImperativeAggregate, Aggregator now could easily fall back to sort based. I am wondering if it degrades the performance.

Copy link
Copy Markdown
Contributor Author

@cloud-fan cloud-fan Dec 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we have a trade-off here: to waste on buffer de/serialization, or to be more likely to fall back to sort.

I think more likely to fall back to sort is just an implementation limitation of the current object hash aggregate, once we add size estimate interface to TypedImperativeAggregate, we can be more aggressive on when to fall back to sort.

/**
* In-place updates the aggregation buffer object with an input row. buffer = buffer + input.
* Updates the aggregation buffer object with an input row and returns a new buffer object. For
* performance, the function may do in-place update and return it instead of constructing new
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this change, do we have use case which doesn't do in-place update?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, TypedSumDouble and TypedSumLong. Ideally we can't do in-place update when the buffer type is primitive type.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are the cases doesn't need serialization?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the Aggregator inteface doesn't guarantee in-place update, and this is a public interface, you can't change it to force users to do in-place update

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. got it. that makes sense.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Dec 22, 2016

Test build #70517 has finished for PR 16383 at commit 0a73fe2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


final override def update(buffer: InternalRow, input: InternalRow): Unit = {
update(getBufferObject(buffer), input)
buffer(mutableAggBufferOffset) = update(getBufferObject(buffer), input)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not find InternalRow implements apply(int), is it an implicit cast here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I don't know that. :)

deser: Expression,
cls: Class[_],
schema: StructType): TypedAggregateExpression = {
copy(inputDeserializer = Some(deser), inputClass = Some(cls), inputSchema = Some(schema))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we need to use inputClass? TypedAggregateExpression has this parameter but I don't see it is used anywhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was there before, looks like it's only used to show more information when explain, but I'm not going to change it in this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I see.

inputAggBufferOffset: Int = 0)
extends TypedImperativeAggregate[Any] with TypedAggregateExpression with NonSQLExpression {

override def deterministic: Boolean = true
Copy link
Copy Markdown
Member

@viirya viirya Dec 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question about the deterministic here. Actually how the data is processed is delegated to Aggregator . I think it can be easy to output non-deterministic result by an Aggregator, especially Aggregator can be used for user-defined aggregations.

Do you think we should let Aggregator to decide if it is a deterministic expression or not?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like UDF, we can also assume the Aggregator is always deterministic. I think in the future we should allow users to define nondeterministic UDF(including Aggregator).

newInputAggBufferOffset: Int): ComplexTypedAggregateExpression =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override def withInputInfo(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks the same as SimpleTypedAggregateExpression.withInputInfo. As the returned type is TypedAggregateExpression. Can we just only implement it in TypedAggregateExpression?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to implement copy in a trait?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. right. nvm.

@viirya
Copy link
Copy Markdown
Member

viirya commented Dec 24, 2016

LGTM. This is a cool improvement.

@cloud-fan
Copy link
Copy Markdown
Contributor Author

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Dec 26, 2016

Test build #70583 has started for PR 16383 at commit 0a73fe2.

@cloud-fan
Copy link
Copy Markdown
Contributor Author

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Dec 26, 2016

Test build #70596 has finished for PR 16383 at commit 0a73fe2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Copy Markdown
Contributor Author

merging to master! I'll address comments in follow-up PR if there are any.

@asfgit asfgit closed this in 8a7db8a Dec 26, 2016
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Dec 29, 2016
## What changes were proposed in this pull request?

Currently we implement `Aggregator` with `DeclarativeAggregate`, which will serialize/deserialize the buffer object every time we process an input.

This PR implements `Aggregator` with `TypedImperativeAggregate` and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up.

For simple buffer object that doesn't need serialization, we still go with `DeclarativeAggregate`, to avoid performance regression.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#16383 from cloud-fan/aggregator.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

Currently we implement `Aggregator` with `DeclarativeAggregate`, which will serialize/deserialize the buffer object every time we process an input.

This PR implements `Aggregator` with `TypedImperativeAggregate` and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up.

For simple buffer object that doesn't need serialization, we still go with `DeclarativeAggregate`, to avoid performance regression.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#16383 from cloud-fan/aggregator.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants