Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12817] Simplify CacheManager code and remove unused BlockManager methods #10748

Closed
wants to merge 4 commits into from

Conversation

JoshRosen
Copy link
Contributor

CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.

We can remove this duplication and delete a significant amount of BlockManager code which existed only to support this CacheManager code.

@JoshRosen
Copy link
Contributor Author

/cc @andrewor14 for review.

values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIterator(blockId, values.toIterator, level, returnValues)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that these putArray() methods just turned around and called putIterator() suggests to me that this isn't a terribly useful method to expose.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49370 has finished for PR 10748 at commit 80d375a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Hmm, it looks like two tests are failing:

[info] - compute without caching when no partitions fit in memory *** FAILED *** (3 seconds, 905 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, localhost): org.apache.spark.storage.BlockException: Block manager failed to return cached value for rdd_0_0!

and

[info] - compute when only some partitions fit in memory *** FAILED *** (3 seconds, 893 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage 0.0 (TID 15, localhost): org.apache.spark.storage.BlockException: Block manager failed to return cached value for rdd_0_5!
[info]  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:86)

My hunch is that the spill-to-disk fallback path works slightly differently when called in BlockManager.putIterator -> MemoryStore.putIterator vs. the old code which interacted directly with the memory manager and other block manager components.

I'll take a look to see if I can spot what's going on. I think that the control flow of the disk fallback path could be a bit better documented, so I'll see about adding some comments to the existing code.

/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem can be addressed via my other patch for locking in the block manager: we can have a put() implicitly retain a lock to the block which was just stored.

@JoshRosen
Copy link
Contributor Author

Going to close this for now to declutter the queue but will re-open as soon as my other patch is merged.

@JoshRosen JoshRosen closed this Jan 25, 2016
asfgit pushed a commit that referenced this pull request Feb 26, 2016
## Motivation

As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults.

## Changes

### BlockInfoManager and reader/writer locks

This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes.

`BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748).

See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics.

### Auto-release of locks at the end of tasks

Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task.

To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks.

### Locking and the MemoryStore

In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed.

### Locking and remote block transfer

This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers.

## FAQ

- **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?**

  Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue.

- **Why not detect "leaked" locks in tests?**:

  See above notes about `take()` and `limit`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #10705 from JoshRosen/pin-pages.
asfgit pushed a commit that referenced this pull request Mar 2, 2016
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.

Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method.

This pull request replaces / subsumes #10748.

/cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11436 from JoshRosen/remove-cachemanager.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.

Thanks to the addition of block-level read/write locks in apache#10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method.

This pull request replaces / subsumes apache#10748.

/cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#11436 from JoshRosen/remove-cachemanager.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants