Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9419] ShuffleMemoryManager and MemoryStore should track memory on a per-task, not per-thread, basis #7734

Closed
wants to merge 11 commits into from

Conversation

JoshRosen
Copy link
Contributor

Spark's ShuffleMemoryManager and MemoryStore track memory on a per-thread basis, which causes problems in the handful of cases where we have tasks that use multiple threads. In PythonRDD, RRDD, ScriptTransformation, and PipedRDD we consume the input iterator in a separate thread in order to write it to an external process. As a result, these RDD's input iterators are consumed in a different thread than the thread that created them, which can cause problems in our memory allocation tracking. For example, if allocations are performed in one thread but deallocations are performed in a separate thread then memory may be leaked or we may get errors complaining that more memory was allocated than was freed.

I think that the right way to fix this is to change our accounting to be performed on a per-task instead of per-thread basis. Note that the current per-thread tracking has caused problems in the past; SPARK-3731 (#2668) fixes a memory leak in PythonRDD that was caused by this issue (that fix is no longer necessary as of this patch).

@JoshRosen
Copy link
Contributor Author

Note that this still needs test cases. I also need to audit PythonRDD, RRDD, ScriptTransformation, and PipedRDD to make sure that they propagate their parent threads' TaskContext thread locals.

@JoshRosen
Copy link
Contributor Author

/cc @andrewor14 for first-impressions on the approach here; just looking for high-level feedback on whether this is the right route to fix some of these problems of memory being freed in different threads than it was allocated in.

@SparkQA
Copy link

SparkQA commented Jul 28, 2015

Test build #38762 has finished for PR 7734 at commit 7b0f04b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2015

Test build #38766 has finished for PR 7734 at commit 44f6497.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

@lianhuiwang, you might be interested in this change given the related work that you're doing at #7130.

@@ -263,11 +263,6 @@ private[spark] class PythonRDD(
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}
} finally {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @davies for this PySpark change.

@rxin
Copy link
Contributor

rxin commented Jul 29, 2015

LGTM

@rxin
Copy link
Contributor

rxin commented Jul 29, 2015

As discussed offline, one thing that's missing is to propagate the task context to other threads.

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #38792 has finished for PR 7734 at commit b4b1702.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jul 29, 2015

I've merged this. Thanks!

@asfgit asfgit closed this in ea49705 Jul 29, 2015
@JoshRosen
Copy link
Contributor Author

Just for future readers of this PR: note that the ScriptTransformation case was already handled in a previous PR, which is why there's not a change for that here.

asfgit pushed a commit that referenced this pull request Jul 30, 2015
This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues:

**List of fixed blockers**:

- [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741).
- [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this)
- [x] Update planner to also check whether codegen is enabled before planning unsafe operators.
- [x] Investigate failing HiveThriftBinaryServerSuite test.  This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD.  This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used.
- [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603).
- [x]  ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF.  This is necessary for `UDFSuite` to pass.  For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682.
- [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6).
- [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736.
- [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`.
- [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs:
  - [x] Wrong answer in `join_1to1` (fixed by #7680)
  - [x] Wrong answer in `join_nulls` (fixed by #7680)
  - [x] Managed memory OOM / leak in `lateral_view`
  - [x] Seems to hang indefinitely in `partcols1`.  This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710.
  - [x] Error while freeing memory in `partcols1`: will be fixed by #7734.
- [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well.
- [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #7564 from JoshRosen/unsafe-by-default and squashes the following commits:

83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
963f567 [Josh Rosen] Reduce buffer size for R tests
d6986de [Josh Rosen] Lower page size in PySpark tests
013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects
5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort
ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used
715517b [Josh Rosen] Enable Unsafe by default
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants