New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9240] [SQL] Hybrid aggregate operator using unsafe row #7813
Conversation
Test build #39133 has finished for PR 7813 at commit
|
Test build #39171 has finished for PR 7813 at commit
|
Seems even if we use a single element of an array or a struct (e.g. |
The failed query is |
Test build #39240 has finished for PR 7813 at commit
|
…itial version of the hybrid iterator.
…or of the iterato will read at least one row from a non-empty input iter.
Test build #39480 has finished for PR 7813 at commit
|
Test build #39481 has finished for PR 7813 at commit
|
@@ -50,7 +50,7 @@ import scala.collection.JavaConversions._ | |||
object TestHive | |||
extends TestHiveContext( | |||
new SparkContext( | |||
System.getProperty("spark.sql.test.master", "local[32]"), | |||
System.getProperty("spark.sql.test.master", "local[2]"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this back to 32
@@ -409,6 +409,9 @@ private[spark] object SQLConf { | |||
val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2", | |||
defaultValue = Some(true), doc = "<TODO>") | |||
|
|||
val USE_HYBRID_AGGREGATE = booleanConf("spark.sql.aggregate.useHybridAggregate", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we want a config flag here?
Test build #39497 has finished for PR 7813 at commit
|
Test build #39498 has finished for PR 7813 at commit
|
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Test build #39502 has finished for PR 7813 at commit
|
case (None, None) => (currentBuffer: MutableRow, row: InternalRow) => {} | ||
|
||
case other => | ||
sys.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an error case -- we should throw IllegalStateException, and make it clear that if we hit this path, it's a bug.
Right now it sounds as if this operator just cannot handle a legitimate case.
Test build #39509 has finished for PR 7813 at commit
|
Test build #39517 has finished for PR 7813 at commit
|
I'm going to merge this. I think this needs more refactoring, but we can do those as follow-ups. |
|
||
// The value of the input KV Iterator has the format of groupingExprs + aggregation buffer. | ||
// We need to project the aggregation buffer out. | ||
private def projectInputBufferToUnsafe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should just remove this function and inline it. We don't want an extra iterator overhead to process the rows.
Each iterator actually adds a lot of overhead, and here it doesn't buy you any code reduction (on the contrary it increases complexity due to the extra abstraction).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
…w up) This is the followup of #7813. It renames `HybridUnsafeAggregationIterator` to `TungstenAggregationIterator` and makes it only work with `UnsafeRow`. Also, I add a `TungstenAggregate` that uses `TungstenAggregationIterator` and make `SortBasedAggregate` (renamed from `SortBasedAggregate`) only works with `SafeRow`. Author: Yin Huai <yhuai@databricks.com> Closes #7954 from yhuai/agg-followUp and squashes the following commits: 4d2f4fc [Yin Huai] Add comments and free map. 0d7ddb9 [Yin Huai] Add TungstenAggregationQueryWithControlledFallbackSuite to test fall back process. 91d69c2 [Yin Huai] Rename UnsafeHybridAggregationIterator to TungstenAggregateIteraotr and make it only work with UnsafeRow.
…w up) This is the followup of #7813. It renames `HybridUnsafeAggregationIterator` to `TungstenAggregationIterator` and makes it only work with `UnsafeRow`. Also, I add a `TungstenAggregate` that uses `TungstenAggregationIterator` and make `SortBasedAggregate` (renamed from `SortBasedAggregate`) only works with `SafeRow`. Author: Yin Huai <yhuai@databricks.com> Closes #7954 from yhuai/agg-followUp and squashes the following commits: 4d2f4fc [Yin Huai] Add comments and free map. 0d7ddb9 [Yin Huai] Add TungstenAggregationQueryWithControlledFallbackSuite to test fall back process. 91d69c2 [Yin Huai] Rename UnsafeHybridAggregationIterator to TungstenAggregateIteraotr and make it only work with UnsafeRow. (cherry picked from commit 3504bf3) Signed-off-by: Reynold Xin <rxin@databricks.com>
…w up) This is the followup of apache/spark#7813. It renames `HybridUnsafeAggregationIterator` to `TungstenAggregationIterator` and makes it only work with `UnsafeRow`. Also, I add a `TungstenAggregate` that uses `TungstenAggregationIterator` and make `SortBasedAggregate` (renamed from `SortBasedAggregate`) only works with `SafeRow`. Author: Yin Huai <yhuai@databricks.com> Closes #7954 from yhuai/agg-followUp and squashes the following commits: 4d2f4fc [Yin Huai] Add comments and free map. 0d7ddb9 [Yin Huai] Add TungstenAggregationQueryWithControlledFallbackSuite to test fall back process. 91d69c2 [Yin Huai] Rename UnsafeHybridAggregationIterator to TungstenAggregateIteraotr and make it only work with UnsafeRow.
This PR adds a base aggregation iterator
AggregationIterator
, which is used to createSortBasedAggregationIterator
(for sort-based aggregation) andUnsafeHybridAggregationIterator
(first it tries hash-based aggregation and falls back to the sort-based aggregation (using external sorter) if we cannot allocate memory for the map). With these two iterators, we will not need existing iterators and I am removing those. Also, we can use a single physicalAggregate
operator and it internally determines what iterators to used.https://issues.apache.org/jira/browse/SPARK-9240