Skip to content

Commit

Permalink
[SPARK-11423] remove MapPartitionsWithPreparationRDD
Browse files Browse the repository at this point in the history
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.

This PR basically revert #8543, #8511, #8038, #8011

Author: Davies Liu <davies@databricks.com>

Closes #9381 from davies/remove_prepare2.
  • Loading branch information
Davies Liu authored and davies committed Oct 30, 2015
1 parent bb5a2af commit 45029bf
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 270 deletions.

This file was deleted.

13 changes: 0 additions & 13 deletions core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,6 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
super.clearDependencies()
rdds = null
}

/**
* Call the prepare method of every parent that has one.
* This is needed for reserving execution memory in advance.
*/
protected def tryPrepareParents(): Unit = {
rdds.collect {
case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
}
}
}

private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
Expand All @@ -94,7 +84,6 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
}
Expand All @@ -118,7 +107,6 @@ private[spark] class ZippedPartitionsRDD3
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
Expand Down Expand Up @@ -146,7 +134,6 @@ private[spark] class ZippedPartitionsRDD4
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
Expand Down

This file was deleted.

6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ object MimaExcludes {
"org.apache.spark.sql.SQLContext.createSession")
) ++ Seq(
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkContext.preferredNodeLocationData_=")
"org.apache.spark.SparkContext.preferredNodeLocationData_="),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.rdd.MapPartitionsWithPreparationRDD"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
)
case v if v.startsWith("1.5") =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

import java.io.IOException;

import com.google.common.annotations.VisibleForTesting;

import org.apache.spark.SparkEnv;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
Expand All @@ -31,7 +30,6 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryLocation;
import org.apache.spark.memory.TaskMemoryManager;

/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
Expand Down Expand Up @@ -218,11 +216,6 @@ public long getPeakMemoryUsedBytes() {
return map.getPeakMemoryUsedBytes();
}

@VisibleForTesting
public int getNumDataPages() {
return map.getNumDataPages();
}

/**
* Free the memory associated with this map. This is idempotent and can be called multiple times.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.TaskContext
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, UnaryNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode, UnsafeFixedWidthAggregationMap}
import org.apache.spark.sql.types.StructType

case class TungstenAggregate(
Expand Down Expand Up @@ -84,59 +83,39 @@ case class TungstenAggregate(
val dataSize = longMetric("dataSize")
val spillSize = longMetric("spillSize")

/**
* Set up the underlying unsafe data structures used before computing the parent partition.
* This makes sure our iterator is not starved by other operators in the same task.
*/
def preparePartition(): TungstenAggregationIterator = {
new TungstenAggregationIterator(
groupingExpressions,
nonCompleteAggregateExpressions,
nonCompleteAggregateAttributes,
completeAggregateExpressions,
completeAggregateAttributes,
initialInputBufferOffset,
resultExpressions,
newMutableProjection,
child.output,
testFallbackStartsAt,
numInputRows,
numOutputRows,
dataSize,
spillSize)
}
child.execute().mapPartitions { iter =>

/** Compute a partition using the iterator already set up previously. */
def executePartition(
context: TaskContext,
partitionIndex: Int,
aggregationIterator: TungstenAggregationIterator,
parentIterator: Iterator[InternalRow]): Iterator[UnsafeRow] = {
val hasInput = parentIterator.hasNext
if (!hasInput) {
// We're not using the underlying map, so we just can free it here
aggregationIterator.free()
if (groupingExpressions.isEmpty) {
val hasInput = iter.hasNext
if (!hasInput && groupingExpressions.nonEmpty) {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
} else {
val aggregationIterator =
new TungstenAggregationIterator(
groupingExpressions,
nonCompleteAggregateExpressions,
nonCompleteAggregateAttributes,
completeAggregateExpressions,
completeAggregateAttributes,
initialInputBufferOffset,
resultExpressions,
newMutableProjection,
child.output,
iter,
testFallbackStartsAt,
numInputRows,
numOutputRows,
dataSize,
spillSize)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
} else {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
aggregationIterator
}
} else {
aggregationIterator.start(parentIterator)
aggregationIterator
}
}

// Note: we need to set up the iterator in each partition before computing the
// parent partition, so we cannot simply use `mapPartitions` here (SPARK-9747).
val resultRdd = {
new MapPartitionsWithPreparationRDD[UnsafeRow, InternalRow, TungstenAggregationIterator](
child.execute(), preparePartition, executePartition, preservesPartitioning = true)
}
resultRdd.asInstanceOf[RDD[InternalRow]]
}

override def simpleString: String = {
Expand Down
Loading

0 comments on commit 45029bf

Please sign in to comment.