Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26114][CORE] ExternalSorter's readingIterator field leak #23083

Closed
wants to merge 15 commits into from

Conversation

szhem
Copy link
Contributor

@szhem szhem commented Nov 19, 2018

What changes were proposed in this pull request?

This pull request fixes SPARK-26114 issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations.

The leak occurs because of not cleaning up ExternalSorter's readingIterator field as it's done for its map and buffer fields.
Additionally there are changes to the CompletionIterator to prevent capturing its sub-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's flatMap iterator (which is used is CoalescedRDD's compute method) the next value of the main iterator is assigned to flatMap's cur field only after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by flatMap's cur field (until it is reassigned).

For the following data

import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 100000) 
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 

and the following job

import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 

... executed like the following

spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'

... executors are always failing with OutOfMemoryErrors.

The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones.

run1-noparams-dominator-tree-externalsorter

P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.

How was this patch tested?

  • Existing unit tests
  • New unit tests
  • Job executions on the live environment

Here is the screenshot before applying this patch
run3-noparams-failure-ui-5x2-repartition-and-sort

Here is the screenshot after applying this patch
run3-noparams-success-ui-5x2-repartition-and-sort
And in case of reducing the number of executors even more the job is still stable
run3-noparams-success-ui-2x2-repartition-and-sort

@SparkQA
Copy link

SparkQA commented Nov 19, 2018

Test build #4433 has finished for PR 23083 at commit 12075ec.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@szhem
Copy link
Contributor Author

szhem commented Nov 20, 2018

Hi @davies, @advancedxy, @rxin,
You seem to be the last ones who touched the corresponding parts of the files in this PR.
Could you be so kind to take a look at it?

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you investigating and this is a nice catch.

However I am not sure about adding new interface to TaskContext.

cc @cloud-fan, @jiangxb1987 & @srowen for more comments

/**
* Removes a (Java friendly) listener that is no longer needed to be executed on task completion.
*/
def remoteTaskCompletionListener(listener: TaskCompletionListener): TaskContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean removeTaskCompletionListener? didn't you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, seems that v was replaced with t on my keyboard)
Thanks a lot!

@@ -99,6 +99,13 @@ private[spark] class TaskContextImpl(
this
}

override def remoteTaskCompletionListener(listener: TaskCompletionListener)
: this.type = synchronized {
onCompleteCallbacks -= listener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not use whether we should add removeTaskCompletionListener or not.

If we are going to add this method. Then this's an O(n) operation. Maybe we need to replace onCompletedCallbacks to a LinkedHashSet?

Copy link
Contributor Author

@szhem szhem Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do the same thing (i.e. changing ArrayBuffer to LinkedHashSet) for onFailureCallbacks too?

  /** List of callback functions to execute when the task completes. */
  @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]

  /** List of callback functions to execute when the task fails. */
  @transient private val onFailureCallbacks = new ArrayBuffer[TaskFailureListener]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to add the new interface, I think so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced ArrayBuffer with LinkedHashSet. Thank you!

Another interesting question is why these collections are traversed in the reverse order in invokeLisneters like the following

  private def invokeListeners[T](
      listeners: Seq[T],
      name: String,
      error: Option[Throwable])(
      callback: T => Unit): Unit = {
    val errorMsgs = new ArrayBuffer[String](2)
    // Process callbacks in the reverse order of registration
    listeners.reverse.foreach { listener =>
      ...
    }
}

I believe @hvanhovell could help to understand. @hvanhovell Could you please remind why task completion and error listeners are traversed in the reverse order (you seem to the the one who added the corresponding line)?

// note that holding sorter references till the end of the task also holds
// references to PartitionedAppendOnlyMap and PartitionedPairBuffer too and these
// ones may consume a significant part of the available memory
context.remoteTaskCompletionListener(taskListener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

Liked I said in the above, do we have another way to remove reference to sorter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! Honestly speaking I don't have pretty good solution right now.
TaskCompletionListener stops sorter in case of task failures, cancels, etc., i.e. in case of abnormal termination. In "happy path" case task completion listener is not needed.

@@ -72,7 +73,8 @@ final class ShuffleBlockFetcherIterator(
maxBlocksInFlightPerAddress: Int,
maxReqSizeShuffleToMem: Long,
detectCorrupt: Boolean)
extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging {
extends Iterator[(BlockId, InputStream)] with DownloadFileManager with TaskCompletionListener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't it's a good idea for ShuffleBlockFetchIterator to be a subclass of TaskCompletionListener.
What's wrong with original solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason is that TaskCompletionListener is added in one place (in initialize method) and needs to be removed in another one (in cleanup method).
image
Will introduce a field for TaskCompletionListener instead. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another field sounds reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced the corresponding field.

  /**
   * Task completion callback to be called in both success as well as failure cases to cleanup.
   * It may not be called at all in case the `cleanup` method has already been called before
   * task completion.
   */
  private[this] val cleanupTaskCompletionListener = (_: TaskContext) => cleanup()

@advancedxy
Copy link
Contributor

advancedxy commented Nov 21, 2018

And another thing:

P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.

So do you mean CoGroupRDDs with multiple input sources will have similar problems? If so, can you create another Jira?

@szhem
Copy link
Contributor Author

szhem commented Nov 21, 2018

So do you mean CoGroupRDDs with multiple input sources will have similar problems?

Yep, but a little bit different ones

If so, can you create another Jira?

Will do it shortly.

…g TaskCompletionListener interface - using private fields instead, using LinkedHashSets as a collection of task completition listeners to faster lookup-s during listener removals
@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 23, 2018

Looking at the code, we are trying to fix 2 memory leaks: the task completion listener in ShuffleBlockFetcherIterator, and the CompletionIterator. If that's case, can you say that in the PR description?

For the task completion listener, I think it's an overkill to introduce a new API, do you know where exactly we leak the memory? and can we null it out when the ShuffleBlockFetcherIterator reaches to its end?

@advancedxy
Copy link
Contributor

For the task completion listener, I think it's an overkill to introduce a new API, do you know where exactly we leak the memory? and can we null it out when the ShuffleBlockFetcherIterator reaches to its end?

If I understand correctly, the memory is leaked because external sorter is referenced in TaskCompletionListener and it's only gced when the task is completed. However for coalesce or similar APIs, multiple BlockStoreShuffleReaders are created as there are multiple input sources, the internal sorter is not released until all shuffle readers are consumed and task is finished.

It's an overkill to introduce a new API. However, I think we can limited it into private[Spark] scope.
Like @szhem, I don't figure out another way to null out the sorter reference yet.

map = null // So that the memory can be garbage-collected
buffer = null // So that the memory can be garbage-collected
readingIterator = null // So that the memory can be garbage-collected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @szhem , I discussed with wenchen offline. I think this is the key point. After nulling out readingIterator, ExternalSorter should released all the memories it occupied.

Yes, ExternalSorter is leaked in TaskCompletionListener, but it would already be stopped in CompletionIterator in happy path. The stopped sorter wouldn't occupy too much memory. The readingIterator
is occupying memory because it may reference map/buffer.partitionedDestructiveSortedIterator, which itself references map/buffer. So only nulling out map or buffer is not enough.

Can you try with this modification only and see whether OOM still occurs.

Copy link
Contributor Author

@szhem szhem Nov 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy I've tried to remove all the modifications except for this one and got OutOfMemoryErrors once again. Here are the details:

  1. Now there are 4 ExternalSorter remained
    2 of them are not closed ones ...
    1_readingiterator_isnull_nonclosed_externalsorter
    and 2 of them are closed ones ...
    2_readingiterator_isnull_closed_externalsorter
    as expected
  2. There are 2 SpillableIterators (which consume a significant part of memory) of already closed ExternalSorters remained
    4_readingiterator_isnull_spillableiterator_of_closed_externalsorter
  3. These SpillableIterators are referenced by CompletionIterators ...
    6_completioniterator_of_blockstoreshufflereader
    ... which in their order seem to be referenced by the cur field ...
    7_coalescedrdd_compute_flatmap
    ... of the standard Iterator's flatMap that is used in the compute method of CoalescedRDD
    image

Standard Iterator's flatMap does not clean up its cur field before obtaining the next value for it which in its order will consume quite a lot of memory too
image
.. and in case of Spark that means that the previous iterator consuming the memory will live there while fetching the next value for it
8_coalescedrdd_compute_flatmap_cur_isnotassigned

So I've returned the changes made to the CompletionIterator to reassign the reference of its sub-iterator to the empty iterator ...
image

... and that has helped (updated the PR correspondingly).

P.S. Cleaning up the standard flatMap iterator's cur field before calling nextCur will help too (here is the corresponding issue but I don't know whether it will be accepted or not)

  def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
    private var cur: Iterator[B] = empty
    private def nextCur() { cur = f(self.next()).toIterator }
    def hasNext: Boolean = {
      // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
      // but slightly shorter bytecode (better JVM inlining!)
      while (!cur.hasNext) {
        cur = empty
        if (!self.hasNext) return false
        nextCur()
      }
      true
    }
    def next(): B = (if (hasNext) cur else empty).next()
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Case well explained.

But I think you need to add corresponding test cases for CompletionIterator and ExternalSorter.

Copy link
Contributor Author

@szhem szhem Nov 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added test case for CompletionIterator.

Regarding ExternalSorter - taking into account that only the private api has been changed and there are no similar test cases which verify that private map and buffer fields are set to null after sorter stops, don't you think that already existing tests will cover the situation with readingIterator too?

@szhem szhem changed the title [SPARK-26114][CORE] ExternalSorter Leak [SPARK-26114][CORE] ExternalSorter's readingIterator field leak Nov 25, 2018
@szhem
Copy link
Contributor Author

szhem commented Nov 25, 2018

Hi @cloud-fan

Looking at the code, we are trying to fix 2 memory leaks: the task completion listener in ShuffleBlockFetcherIterator, and the CompletionIterator. If that's case, can you say that in the PR description?

I've updated the description and the title of this PR correspondingly.

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add corresponding unit test please.

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

LGTM, thanks for your great work!

@SparkQA
Copy link

SparkQA commented Nov 26, 2018

Test build #99254 has finished for PR 23083 at commit 1723819.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 26, 2018

Test build #99267 has finished for PR 23083 at commit 1723819.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #99309 has finished for PR 23083 at commit 1723819.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #99313 has finished for PR 23083 at commit 1723819.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #99326 has finished for PR 23083 at commit 1723819.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 28, 2018

Test build #99351 has finished for PR 23083 at commit 1723819.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 28, 2018

Test build #99360 has finished for PR 23083 at commit 1723819.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.4!

asfgit pushed a commit that referenced this pull request Nov 28, 2018
## What changes were proposed in this pull request?

This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations.

The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields.
Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned).

For the following data
```scala
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._

// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
  .flatMap(item => (1 to 100000)
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024))))
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
```

and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(10,false)
  .count
```

... executed like the following
```bash
spark-shell \
  --num-executors=5 \
  --executor-cores=2 \
  --master=yarn \
  --deploy-mode=client \
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```

... executors are always failing with OutOfMemoryErrors.

The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones.

![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)

P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.

## How was this patch tested?

- Existing unit tests
- New unit tests
- Job executions on the live environment

Here is the screenshot before applying this patch
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)

Here is the screenshot after applying this patch
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still stable
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)

Closes #23083 from szhem/SPARK-26114-externalsorter-leak.

Authored-by: Sergey Zhemzhitsky <szhemzhitski@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 438f8fd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@asfgit asfgit closed this in 438f8fd Nov 28, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations.

The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields.
Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned).

For the following data
```scala
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._

// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
  .flatMap(item => (1 to 100000)
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024))))
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
```

and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(10,false)
  .count
```

... executed like the following
```bash
spark-shell \
  --num-executors=5 \
  --executor-cores=2 \
  --master=yarn \
  --deploy-mode=client \
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```

... executors are always failing with OutOfMemoryErrors.

The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones.

![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)

P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.

## How was this patch tested?

- Existing unit tests
- New unit tests
- Job executions on the live environment

Here is the screenshot before applying this patch
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)

Here is the screenshot after applying this patch
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still stable
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)

Closes apache#23083 from szhem/SPARK-26114-externalsorter-leak.

Authored-by: Sergey Zhemzhitsky <szhemzhitski@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
## What changes were proposed in this pull request?

This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations.

The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields.
Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned).

For the following data
```scala
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._

// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
  .flatMap(item => (1 to 100000)
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024))))
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
```

and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(10,false)
  .count
```

... executed like the following
```bash
spark-shell \
  --num-executors=5 \
  --executor-cores=2 \
  --master=yarn \
  --deploy-mode=client \
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```

... executors are always failing with OutOfMemoryErrors.

The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones.

![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)

P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.

## How was this patch tested?

- Existing unit tests
- New unit tests
- Job executions on the live environment

Here is the screenshot before applying this patch
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)

Here is the screenshot after applying this patch
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still stable
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)

Closes apache#23083 from szhem/SPARK-26114-externalsorter-leak.

Authored-by: Sergey Zhemzhitsky <szhemzhitski@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 438f8fd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
## What changes were proposed in this pull request?

This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations.

The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields.
Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned).

For the following data
```scala
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._

// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
  .flatMap(item => (1 to 100000)
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024))))
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
```

and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(10,false)
  .count
```

... executed like the following
```bash
spark-shell \
  --num-executors=5 \
  --executor-cores=2 \
  --master=yarn \
  --deploy-mode=client \
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```

... executors are always failing with OutOfMemoryErrors.

The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones.

![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)

P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.

## How was this patch tested?

- Existing unit tests
- New unit tests
- Job executions on the live environment

Here is the screenshot before applying this patch
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)

Here is the screenshot after applying this patch
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still stable
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)

Closes apache#23083 from szhem/SPARK-26114-externalsorter-leak.

Authored-by: Sergey Zhemzhitsky <szhemzhitski@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 438f8fd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
if (!r && !completed) {
completed = true
// reassign to release resources of highly resource consuming iterators early
iter = Iterator.empty.asInstanceOf[I]

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenjunZou, you can find the details in this message

Copy link

@ChenjunZou ChenjunZou Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, szhem :)
your UT explains all.
at first I misunderstand sub as CompletionIterator(val sub)
Hided, well done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants