Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10341] [SQL] fix memory starving in unsafe SMJ #8511

Closed
wants to merge 5 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Aug 28, 2015

In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page.

Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children.

cc @rxin @JoshRosen

@davies
Copy link
Contributor Author

davies commented Aug 28, 2015

ping @JoshRosen

@SparkQA
Copy link

SparkQA commented Aug 28, 2015

Test build #41759 has finished for PR 8511 at commit 1afb4f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeans @Since("1.5.0") (
    • class GaussianMixtureModel @Since("1.3.0") (
    • class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vector])
    • class PowerIterationClusteringModel @Since("1.3.0") (
    • class StreamingKMeansModel @Since("1.2.0") (
    • class StreamingKMeans @Since("1.2.0") (
    • class ChiSqSelectorModel @Since("1.3.0") (
    • class ChiSqSelector @Since("1.3.0") (
    • class ElementwiseProduct @Since("1.4.0") (
    • class IDF @Since("1.2.0") (@Since("1.2.0") val minDocFreq: Int)
    • class Normalizer @Since("1.1.0") (p: Double) extends VectorTransformer
    • class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int)
    • class StandardScaler @Since("1.1.0") (withMean: Boolean, withStd: Boolean) extends Logging
    • class StandardScalerModel @Since("1.3.0") (
    • class PoissonGenerator @Since("1.1.0") (
    • class ExponentialGenerator @Since("1.3.0") (
    • class GammaGenerator @Since("1.3.0") (
    • class LogNormalGenerator @Since("1.3.0") (
    • case class Rating @Since("0.8.0") (
    • class MatrixFactorizationModel @Since("0.8.0") (
    • abstract class GeneralizedLinearModel @Since("1.0.0") (
    • class IsotonicRegressionModel @Since("1.3.0") (
    • case class LabeledPoint @Since("1.0.0") (
    • class LassoModel @Since("1.1.0") (
    • class LinearRegressionModel @Since("1.1.0") (
    • class RidgeRegressionModel @Since("1.1.0") (
    • class MultivariateGaussian @Since("1.3.0") (
    • case class BoostingStrategy @Since("1.4.0") (
    • class Strategy @Since("1.3.0") (
    • class DecisionTreeModel @Since("1.0.0") (
    • class Node @Since("1.2.0") (
    • class Predict @Since("1.2.0") (
    • class RandomForestModel @Since("1.2.0") (
    • class GradientBoostedTreesModel @Since("1.2.0") (

@SparkQA
Copy link

SparkQA commented Aug 29, 2015

Test build #1703 has finished for PR 8511 at commit 1afb4f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeans @Since("1.5.0") (

@SparkQA
Copy link

SparkQA commented Aug 29, 2015

Test build #1704 has finished for PR 8511 at commit 1afb4f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeans @Since("1.5.0") (

@SparkQA
Copy link

SparkQA commented Aug 29, 2015

Test build #1705 has finished for PR 8511 at commit 1afb4f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeans @Since("1.5.0") (

@marmbrus
Copy link
Contributor

/cc @andrewor14

@@ -24,6 +24,8 @@ import org.apache.spark.{Partition, Partitioner, TaskContext}
/**
* An RDD that applies a user provided function to every partition of the parent RDD, and
* additionally allows the user to prepare each partition before computing the parent partition.
*
* TODO(davies): remove this once SPARK-10342 is fixed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this comment, it is not clear what need to be removed (remove this class or remove changes of this PR). Can you add a comment in SPARK-10342 about what need to be removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also unclear we want to remove this even with better memory
management. You might still want this with those. I would just remove this
comment.

On Aug 29, 2015, at 1:37 PM, Yin Huai notifications@github.com wrote:

In
core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
#8511 (comment):

@@ -24,6 +24,8 @@ import org.apache.spark.{Partition, Partitioner, TaskContext}
/**

  • An RDD that applies a user provided function to every partition of the parent RDD, and
  • additionally allows the user to prepare each partition before computing the parent partition.
    • * TODO(davies): remove this once SPARK-10342 is fixed

Based on this comment, it is not clear what need to be removed (remove this
class or remove changes of this PR). Can you add a comment in SPARK-10342
about what need to be removed?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/8511/files#r38265746.

@SparkQA
Copy link

SparkQA commented Aug 29, 2015

Test build #41785 has finished for PR 8511 at commit d44be2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val preparedArgument = preparePartition()
prepare()
// The same RDD could be called multiple times in one task, each call of compute() should
// have sep
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete comment?

@andrewor14
Copy link
Contributor

retest this please

@@ -73,6 +73,13 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
super.clearDependencies()
rdds = null
}

protected def tryPrepareChildren() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit return type. Also can you add a java doc to this:

/**
 * Call the prepare method of every children that has one.
 * This is needed for reserving execution memory in advance.
 */

@andrewor14
Copy link
Contributor

@davies I think the changes here fix the problem, but I find it a little arbitrary that ZippedPartitionsRDD need to change. In the future if we decide to implement SMJ using some other operators then it will silently fail. I wonder if we should make the changes in SMJ itself instead, e.g.:

// In SortMergeJoin
protected override def doExecute(): RDD[InternalRow] = {
  ...

  // You'll need to add the `rdd` argument here, which doesn't currently exist
  def preparePartition(rdd: RDD[...]): Unit = {
    rdd.getNarrowAncestors.collect {
      case ancestor: MapPartitionsWithPrepareRDD[...] => ancestor.prepare()
    }
  }

  def executePartition(...): Iterator[...] = {
    // Just return the parent iterator (no-op)
  }

  val zipped = left.execute().zipPartitions(right.execute()) { ... }
  new MapPartitionsWithPrepareRDD[...](
    zipped, preparePartition, executePartition, preservesPartitioning = true)
}

This essentially forces SMJ to call the prepare() methods of all ancestor RDDs before executing. We might need to change getNarrowAncestors to maintain an ordering (right now it's arbitrary). The cost of traversing the ancestors here should be small because SMJ doesn't have long lineages.

Will something like this work?

@davies
Copy link
Contributor Author

davies commented Aug 30, 2015

@andrewor14 Not only SortMergeJoin has this problem, SortMergeOuterJoin also has it. And, any join that depends on TungstenAggregation or TungstenSort will have this problem. I think ZipPartitionRDDs is the only place that could introduce multiple MapPartitionsWithPrepareRDD in the same task. Correct me, if there are others.

@SparkQA
Copy link

SparkQA commented Aug 30, 2015

Test build #41797 has finished for PR 8511 at commit fa94892.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2015

Test build #41803 has finished for PR 8511 at commit a3a8a34.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2015

Test build #1706 has finished for PR 8511 at commit a3a8a34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeans @Since("1.5.0") (

@andrewor14
Copy link
Contributor

Hm, my only concern with doing the getNarrowAncestors thing in ZipPartitionsRDD is that now everyone who uses it will traverse the entire lineage. E.g. if GraphX uses ZipPartitionsRDD then we might introduce a regression. I think it's safest if we just do it in the specific operators we care about, i.e. SMJ / SMOJ (are there any others?).

@davies
Copy link
Contributor Author

davies commented Aug 31, 2015

@andrewor14 I think getNarrowAncestors will not traverse the entire lineage, it only visit the RDDs within the current stage. It still not cheap if the RDD is an union of thousands RDDs.

We could rollback to use children to address you concern, it also work for SMJ and SMOJ, but could fail on others (like FULL OUTER JOIN(AGG(A), AGG(B)). Does this work for you?

@SparkQA
Copy link

SparkQA commented Aug 31, 2015

Test build #41838 has finished for PR 8511 at commit 1f4b176.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

LGTM will merge once tests pass.

@rxin
Copy link
Contributor

rxin commented Aug 31, 2015

I'm going to optimistically merge this since most of the tests passed.

@asfgit asfgit closed this in 540bdee Aug 31, 2015
asfgit pushed a commit that referenced this pull request Aug 31, 2015
In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page.

Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children.

cc rxin JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #8511 from davies/smj_memory.

(cherry picked from commit 540bdee)
Signed-off-by: Reynold Xin <rxin@databricks.com>
@@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M

override def getPartitions: Array[Partition] = firstParent[T].partitions

// In certain join operations, prepare can be called on the same partition multiple times.
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed this can be a val. We can fix this in a follow-up patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this a follow up PR.

@SparkQA
Copy link

SparkQA commented Aug 31, 2015

Test build #41839 has finished for PR 8511 at commit 544f175.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Oct 30, 2015
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.

This PR basically revert #8543, #8511, #8038, #8011

Author: Davies Liu <davies@databricks.com>

Closes #9381 from davies/remove_prepare2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants