-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9747] [SQL] Avoid starving an unsafe operator in aggregation #8038
Conversation
resultExpressions, | ||
newMutableProjection, | ||
child.output, | ||
testFallbackStartsAt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if there is no memory space left to reserve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll fail fast with "unable to acquire memory" exception
…emory-agg Conflicts: core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
Test build #40190 has finished for PR 8038 at commit
|
Test build #40195 has finished for PR 8038 at commit
|
@andrewor14 can you bring this up to date? |
…emory-agg Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
In TungstenAggregate, we fall back to sort-based aggregation if the hash-based approach cannot request more memory. To do this, we create a new sorter from an existing unsafe map destructively. Because this is largely in place, we don't need to reserve a page in the sorter's constructor.
test this please. |
aggregationIterator.free() | ||
if (groupingExpressions.isEmpty) { | ||
// This is a grouped aggregate and the input iterator is empty, | ||
// so return an empty iterator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we should put this comment in the else
block. Instead, this branch is used when we do not have input row and there is no grouping expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
LGTM. Let's wait for jenkins. |
Test build #40328 has finished for PR 8038 at commit
|
test this please. |
Test build #40317 timed out for PR 8038 at commit |
test this please. |
Test build #40334 timed out for PR 8038 at commit |
|
||
test("memory acquired on construction") { | ||
// Needed for various things in SparkEnv | ||
sc = new SparkContext("local", "testing") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the spark context we are creating at here messed up the the following tests. How about we comment it out and try the pr builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, is it possible to create the taskMemoryManager and shuffleMemoryManager without creating a new SparkContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I can figure something out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this is why we shouldn't have singleton SQLContexts!)
Test build #1428 timed out for PR 8038 at commit |
Test build #40472 timed out for PR 8038 at commit |
…emory-agg Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
retest this please |
Test build #1454 has finished for PR 8038 at commit
|
retest this please |
I just triggered another build as a backup. |
Test build #40525 has finished for PR 8038 at commit
|
compilation failed? |
|
Test build #1463 has finished for PR 8038 at commit
|
OK, I fixed. Jenkins, retest this please? |
Test build #1470 has finished for PR 8038 at commit
|
|
Test build #40597 timed out for PR 8038 at commit |
Test build #1477 timed out for PR 8038 at commit |
Test build #1478 timed out for PR 8038 at commit |
So weird... this finished running all python tests successfully too but still timed out somehow? |
I'm merging this since it's unlikely a separate issue to cause your test timeout (all the tests did run) |
This is the sister patch to #8011, but for aggregation. In a nutshell: create the `TungstenAggregationIterator` before computing the parent partition. Internally this creates a `BytesToBytesMap` which acquires a page in the constructor as of this patch. This ensures that the aggregation operator is not starved since we reserve at least 1 page in advance. rxin yhuai Author: Andrew Or <andrew@databricks.com> Closes #8038 from andrewor14/unsafe-starve-memory-agg. (cherry picked from commit e011079) Signed-off-by: Reynold Xin <rxin@databricks.com>
Test build #1495 has finished for PR 8038 at commit
|
This is the sister patch to apache#8011, but for aggregation. In a nutshell: create the `TungstenAggregationIterator` before computing the parent partition. Internally this creates a `BytesToBytesMap` which acquires a page in the constructor as of this patch. This ensures that the aggregation operator is not starved since we reserve at least 1 page in advance. rxin yhuai Author: Andrew Or <andrew@databricks.com> Closes apache#8038 from andrewor14/unsafe-starve-memory-agg.
This is the sister patch to #8011, but for aggregation.
In a nutshell: create the
TungstenAggregationIterator
before computing the parent partition. Internally this creates aBytesToBytesMap
which acquires a page in the constructor as of this patch. This ensures that the aggregation operator is not starved since we reserve at least 1 page in advance.@rxin @yhuai