Skip to content

[SPARK-16973][SQL] remove the buffer offsets in ImperativeAggregate#14562

Closed
cloud-fan wants to merge 5 commits intoapache:masterfrom
cloud-fan:agg-minor
Closed

[SPARK-16973][SQL] remove the buffer offsets in ImperativeAggregate#14562
cloud-fan wants to merge 5 commits intoapache:masterfrom
cloud-fan:agg-minor

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Aug 9, 2016

What changes were proposed in this pull request?

the mutableAggBufferOffset and inputAggBufferOffset in ImperativeAggregate are really hard to understand and tightly coupled with aggregation implementation. What's worse, all ImperativeAggregate implementations need to understand this concept and deal with it carefully.

This PR isolate this buffet offsets concept into the base class ImperativeAggregate, by introducing a sliced row. Then put the interface to ImperativeAggregateImpl, all ImperativeAggregateImpl implementations don't need to care about the buffer offsets anymore.

How was this patch tested?

existing tests.

@cloud-fan
Copy link
Contributor Author

cc @yhuai @liancheng @clockfly

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63438 has finished for PR 14562 at commit 9658056.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CollectList(child: Expression) extends Collect
    • case class CollectSet(child: Expression) extends Collect
    • trait BaseSlicedInternalRow extends InternalRow
    • case class SlicedInternalRow(offset: Int, numFields: Int) extends BaseSlicedInternalRow
    • case class SlicedMutableRow(offset: Int, numFields: Int)

@hvanhovell
Copy link
Contributor

Does this have performance implications? We are adding a layer of indirection to a hot code path.

}

override def copy(): InternalRow = {
throw new UnsupportedOperationException("Cannot copy a SlicedMutableRow")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SlicedMutableRow -> SlicedInternalRow?

@cloud-fan
Copy link
Contributor Author

@hvanhovell I'm not sure about the performance, will benchmark it later, hopefully they can be inlined by JVM successfully.

}
}

case class SlicedInternalRow(offset: Int, numFields: Int) extends BaseSlicedInternalRow {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a case class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, does case class has performance penalty? It doesn't need to be though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It generates a lot of crap in bytecode, so would be good to not generate them unless they are useful.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63524 has finished for PR 14562 at commit 1444176.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SlicedInternalRow(protected val offset: Int, val numFields: Int)
    • class SlicedMutableRow(protected val offset: Int, val numFields: Int)

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63552 has finished for PR 14562 at commit 8e0531b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate
final def setMutableBufferOffset(offset: Int): Unit = {
assert(mutableBufferRow == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to do a runtime check? Then how about using require?

assert may be removed by compiler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Require implies that the caller has passed a bad argument. Assert checks if invariants hold (the class is not in an unexpected state). I think this should be an assert.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of setting an attribute that is supposed to be immutable, can we use copy to copy the whole class?

@clockfly
Copy link
Contributor

I think my biggest concern is about the performance and abstraction.

  1. Seems there are too many function call to update each row. Is this cost too high?
  2. The new methods added in ImperativeAggregate may create confusion, like
    There are both
def update
def doUpdate
def merge
def doMerge
def initialize
def doInitialize

@clockfly
Copy link
Contributor

Maybe there is another alternative, for example, we can define an InternalRowReader, which wraps the offset.

Sub-class of ImperativeAggregate need to use the InternalRowReader to read the fields from InternalRow.

}

// Note: although this simply copies aggBufferAttributes, this common code can not be placed
// in the superclass because that will lead to initialization ordering issues.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally get rid of it!

@SparkQA
Copy link

SparkQA commented Aug 17, 2016

Test build #63906 has finished for PR 14562 at commit e50aece.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class Collect extends ImperativeAggregateImpl
    • sealed abstract class AggregateFunction extends Expression
    • case class ImperativeAggregate(

@SparkQA
Copy link

SparkQA commented Aug 17, 2016

Test build #63908 has finished for PR 14562 at commit 5c8f324.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class Collect extends ImperativeAggregateImpl
    • sealed abstract class AggregateFunction extends Expression
    • case class ImperativeAggregate(

@cloud-fan cloud-fan closed this Nov 7, 2016
@cloud-fan cloud-fan deleted the agg-minor branch December 14, 2016 12:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants