From d600e681bde23925dedf1261654a34894713f042 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Tue, 23 Aug 2016 01:18:05 +0800 Subject: [PATCH] Use a looser interface for InternalRow in result projection --- .../sql/execution/aggregate/AggregationIterator.scala | 10 +++++----- .../aggregate/TungstenAggregationIterator.scala | 8 +++++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index 34de76dd4ab4e..3e07c9b8c63c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -201,7 +201,7 @@ abstract class AggregationIterator( protected val groupingAttributes = groupingExpressions.map(_.toAttribute) // Initializing the function used to generate the output row. - protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = { + protected def generateResultProjection(): (InternalRow, MutableRow) => UnsafeRow = { val joinedRow = new JoinedRow val modes = aggregateExpressions.map(_.mode).distinct val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes) @@ -217,7 +217,7 @@ abstract class AggregationIterator( val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateAttributes) - (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => { + (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => { // Generate results for all expression-based aggregate functions. expressionAggEvalProjection(currentBuffer) // Generate results for all imperative aggregate functions. @@ -234,19 +234,19 @@ abstract class AggregationIterator( val resultProjection = UnsafeProjection.create( groupingAttributes ++ bufferAttributes, groupingAttributes ++ bufferAttributes) - (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => { + (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => { resultProjection(joinedRow(currentGroupingKey, currentBuffer)) } } else { // Grouping-only: we only output values based on grouping expressions. val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes) - (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => { + (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => { resultProjection(currentGroupingKey) } } } - protected val generateOutput: (UnsafeRow, MutableRow) => UnsafeRow = + protected val generateOutput: (InternalRow, MutableRow) => UnsafeRow = generateResultProjection() /** Initializes buffer values for all aggregate functions. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 4b8adf5230717..b89c8a11f98ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -127,7 +127,7 @@ class TungstenAggregationIterator( } // Creates a function used to generate output rows. - override protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = { + override protected def generateResultProjection(): (InternalRow, MutableRow) => UnsafeRow = { val modes = aggregateExpressions.map(_.mode).distinct if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) { // Fast path for partial aggregation, UnsafeRowJoiner is usually faster than projection @@ -137,8 +137,10 @@ class TungstenAggregationIterator( val bufferSchema = StructType.fromAttributes(bufferAttributes) val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema) - (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => { - unsafeRowJoiner.join(currentGroupingKey, currentBuffer.asInstanceOf[UnsafeRow]) + (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => { + unsafeRowJoiner.join( + currentGroupingKey.asInstanceOf[UnsafeRow], + currentBuffer.asInstanceOf[UnsafeRow]) } } else { super.generateResultProjection()