Skip to content

Commit

Permalink
fix aggregation without functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jan 29, 2016
1 parent 70a7c7e commit 951e2cd
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Expand Up @@ -238,7 +238,7 @@ abstract class AggregationIterator(
resultProjection(joinedRow(currentGroupingKey, currentBuffer))
}
} else {
// Grouping-only: we only output values of grouping expressions.
// Grouping-only: we only output values based on grouping expressions.
val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
(currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
resultProjection(currentGroupingKey)
Expand Down
Expand Up @@ -151,10 +151,10 @@ case class TungstenAggregate(
}

// generate variables for output
val (resultVars, genResult) = if (modes.contains(Final) | modes.contains(Complete)) {
val bufferAttrs = functions.flatMap(_.aggBufferAttributes)
val (resultVars, genResult) = if (modes.contains(Final) || modes.contains(Complete)) {
// evaluate aggregate results
ctx.currentVars = bufVars
val bufferAttrs = functions.flatMap(_.aggBufferAttributes)
val aggResults = functions.map(_.evaluateExpression).map { e =>
BindReferences.bindReference(e, bufferAttrs).gen(ctx)
}
Expand All @@ -167,9 +167,13 @@ case class TungstenAggregate(
| ${aggResults.map(_.code).mkString("\n")}
| ${resultVars.map(_.code).mkString("\n")}
""".stripMargin)
} else {
} else if (modes.contains(Partial) || modes.contains(PartialMerge)) {
// output the aggregate buffer directly
(bufVars, "")
} else {
// no aggregate function, the result should be literals
val resultVars = resultExpressions.map(_.gen(ctx))
(resultVars, resultVars.map(_.code).mkString("\n"))
}

val doAgg = ctx.freshName("doAgg")
Expand Down

0 comments on commit 951e2cd

Please sign in to comment.