From 1afb4f35e70b1d91b073dfe03e801d7570b7a97a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 28 Aug 2015 13:47:50 -0700 Subject: [PATCH 1/5] fix memory starving in unsafe SMJ --- .../spark/rdd/MapPartitionsWithPreparationRDD.scala | 6 ++++-- .../org/apache/spark/rdd/ZippedPartitionsRDD.scala | 10 ++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index b475bd8d79f85..49202d86e7c5a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -38,12 +38,14 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M override def getPartitions: Array[Partition] = firstParent[T].partitions + lazy val preparedArgument: M = preparePartition() + /** * Prepare a partition before computing it from its parent. */ override def compute(partition: Partition, context: TaskContext): Iterator[U] = { - val preparedArgument = preparePartition() + val prepared = preparedArgument val parentIterator = firstParent[T].iterator(partition, context) - executePartition(context, partition.index, preparedArgument, parentIterator) + executePartition(context, partition.index, prepared, parentIterator) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 81f40ad33aa5d..419760c4e9ef0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -73,6 +73,13 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( super.clearDependencies() rdds = null } + + protected def tryPrepareChildren() { + rdds.foreach { + case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.preparedArgument + case _ => + } + } } private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( @@ -84,6 +91,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { + tryPrepareChildren() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -107,6 +115,7 @@ private[spark] class ZippedPartitionsRDD3 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { + tryPrepareChildren() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), @@ -134,6 +143,7 @@ private[spark] class ZippedPartitionsRDD4 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { + tryPrepareChildren() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), From d44be2dc3051e70266304854c979d88c2a65719c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 29 Aug 2015 10:35:08 -0700 Subject: [PATCH 2/5] fix bug --- .../rdd/MapPartitionsWithPreparationRDD.scala | 17 +++++++++++++++-- .../apache/spark/rdd/ZippedPartitionsRDD.scala | 2 +- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index 49202d86e7c5a..01bd224ca25df 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -24,6 +24,8 @@ import org.apache.spark.{Partition, Partitioner, TaskContext} /** * An RDD that applies a user provided function to every partition of the parent RDD, and * additionally allows the user to prepare each partition before computing the parent partition. + * + * TODO(davies): remove this once SPARK-10342 is fixed */ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M: ClassTag]( prev: RDD[T], @@ -38,13 +40,24 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M override def getPartitions: Array[Partition] = firstParent[T].partitions - lazy val preparedArgument: M = preparePartition() + private[this] var preparedArgument: Option[M] = None + + def prepare(): Unit = { + // This could be called multiple times + if (preparedArgument.isEmpty) { + preparedArgument = Some(preparePartition()) + } + } /** * Prepare a partition before computing it from its parent. */ override def compute(partition: Partition, context: TaskContext): Iterator[U] = { - val prepared = preparedArgument + prepare() + // The same RDD could be called multiple times in one task, each call of compute() should + // have sep + val prepared = preparedArgument.get + preparedArgument = None val parentIterator = firstParent[T].iterator(partition, context) executePartition(context, partition.index, prepared, parentIterator) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 419760c4e9ef0..40e38e746db19 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -76,7 +76,7 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( protected def tryPrepareChildren() { rdds.foreach { - case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.preparedArgument + case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare() case _ => } } From fa9489238250ef972465821558e292d5c9e4eb7d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 29 Aug 2015 22:32:46 -0700 Subject: [PATCH 3/5] address comment, add unit test --- .../spark/rdd/MapPartitionsWithPreparationRDD.scala | 4 +--- .../rdd/MapPartitionsWithPreparationRDDSuite.scala | 13 +++++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index 01bd224ca25df..2f8528106ce61 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -24,8 +24,6 @@ import org.apache.spark.{Partition, Partitioner, TaskContext} /** * An RDD that applies a user provided function to every partition of the parent RDD, and * additionally allows the user to prepare each partition before computing the parent partition. - * - * TODO(davies): remove this once SPARK-10342 is fixed */ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M: ClassTag]( prev: RDD[T], @@ -55,7 +53,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M override def compute(partition: Partition, context: TaskContext): Iterator[U] = { prepare() // The same RDD could be called multiple times in one task, each call of compute() should - // have sep + // have separate prepared argument. val prepared = preparedArgument.get preparedArgument = None val parentIterator = firstParent[T].iterator(partition, context) diff --git a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala index c16930e7d6491..ee7b107d9e98f 100644 --- a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala @@ -46,11 +46,16 @@ class MapPartitionsWithPreparationRDDSuite extends SparkFunSuite with LocalSpark } // Verify that the numbers are pushed in the order expected - val result = { - new MapPartitionsWithPreparationRDD[Int, Int, Unit]( - parent, preparePartition, executePartition).collect() - } + val rdd = new MapPartitionsWithPreparationRDD[Int, Int, Unit]( + parent, preparePartition, executePartition) + val result = rdd.collect() assert(result === Array(10, 20, 30)) + + TestObject.things.clear() + val rdd2 = new MapPartitionsWithPreparationRDD[Int, Int, Unit]( + parent, preparePartition, executePartition) + val result2 = rdd.zipPartitions(rdd2)((a, b) => a).collect() + assert(result2 === Array(10, 10, 20, 30, 20, 30)) } } From a3a8a34c17d56701fc4f373ae965ad7975a22b79 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sun, 30 Aug 2015 01:11:59 -0700 Subject: [PATCH 4/5] address comment --- .../spark/rdd/MapPartitionsWithPreparationRDD.scala | 8 ++++---- .../scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala | 9 ++++++--- .../spark/rdd/MapPartitionsWithPreparationRDDSuite.scala | 1 + 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index 2f8528106ce61..0f63ad77920da 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -40,21 +40,21 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M private[this] var preparedArgument: Option[M] = None - def prepare(): Unit = { - // This could be called multiple times + def prepare(): M = { + // This could be called multiple times, by compute or parent's compute if (preparedArgument.isEmpty) { preparedArgument = Some(preparePartition()) } + preparedArgument.get } /** * Prepare a partition before computing it from its parent. */ override def compute(partition: Partition, context: TaskContext): Iterator[U] = { - prepare() + val prepared = prepare() // The same RDD could be called multiple times in one task, each call of compute() should // have separate prepared argument. - val prepared = preparedArgument.get preparedArgument = None val parentIterator = firstParent[T].iterator(partition, context) executePartition(context, partition.index, prepared, parentIterator) diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 40e38e746db19..1dcdc01eca8b0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -74,10 +74,13 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( rdds = null } - protected def tryPrepareChildren() { - rdds.foreach { + /** + * Call the prepare method of every children that has one. + * This is needed for reserving execution memory in advance. + */ + protected def tryPrepareChildren(): Unit = { + getNarrowAncestors.collect { case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare() - case _ => } } } diff --git a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala index ee7b107d9e98f..e281e817e493d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala @@ -52,6 +52,7 @@ class MapPartitionsWithPreparationRDDSuite extends SparkFunSuite with LocalSpark assert(result === Array(10, 20, 30)) TestObject.things.clear() + // Zip two of these RDDs, both should be prepared before the parent is executed val rdd2 = new MapPartitionsWithPreparationRDD[Int, Int, Unit]( parent, preparePartition, executePartition) val result2 = rdd.zipPartitions(rdd2)((a, b) => a).collect() From 544f175b4b88c91becf3d6bf5e81d559812410b3 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 31 Aug 2015 12:49:39 -0700 Subject: [PATCH 5/5] address comments --- .../rdd/MapPartitionsWithPreparationRDD.scala | 26 +++++++++++-------- .../spark/rdd/ZippedPartitionsRDD.scala | 12 ++++----- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index 0f63ad77920da..1f2213d0c4346 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.{Partition, Partitioner, TaskContext} @@ -38,24 +39,27 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M override def getPartitions: Array[Partition] = firstParent[T].partitions - private[this] var preparedArgument: Option[M] = None + // In certain join operations, prepare can be called on the same partition multiple times. + // In this case, we need to ensure that each call to compute gets a separate prepare argument. + private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] - def prepare(): M = { - // This could be called multiple times, by compute or parent's compute - if (preparedArgument.isEmpty) { - preparedArgument = Some(preparePartition()) - } - preparedArgument.get + /** + * Prepare a partition for a single call to compute. + */ + def prepare(): Unit = { + preparedArguments += preparePartition() } /** * Prepare a partition before computing it from its parent. */ override def compute(partition: Partition, context: TaskContext): Iterator[U] = { - val prepared = prepare() - // The same RDD could be called multiple times in one task, each call of compute() should - // have separate prepared argument. - preparedArgument = None + val prepared = + if (preparedArguments.isEmpty) { + preparePartition() + } else { + preparedArguments.remove(0) + } val parentIterator = firstParent[T].iterator(partition, context) executePartition(context, partition.index, prepared, parentIterator) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 1dcdc01eca8b0..b3c64394abc76 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -75,11 +75,11 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( } /** - * Call the prepare method of every children that has one. + * Call the prepare method of every parent that has one. * This is needed for reserving execution memory in advance. */ - protected def tryPrepareChildren(): Unit = { - getNarrowAncestors.collect { + protected def tryPrepareParents(): Unit = { + rdds.collect { case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare() } } @@ -94,7 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - tryPrepareChildren() + tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -118,7 +118,7 @@ private[spark] class ZippedPartitionsRDD3 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - tryPrepareChildren() + tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), @@ -146,7 +146,7 @@ private[spark] class ZippedPartitionsRDD4 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - tryPrepareChildren() + tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context),