Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13308] ManagedBuffers passed to OneToOneStreamManager need to be freed in non-error cases #11193

Conversation

JoshRosen
Copy link
Contributor

ManagedBuffers that are passed to OneToOneStreamManager.registerStream need to be freed by the manager once it's done using them. However, the current code only frees them in certain error-cases and not during typical operation. This isn't a major problem today, but it will cause memory leaks after we implement better locking / pinning in the BlockManager (see #10705).

This patch modifies the relevant network code so that the ManagedBuffers are freed as soon as the messages containing them are processed by the lower-level Netty message sending code.

/cc @zsxwing for review.

@JoshRosen
Copy link
Contributor Author

Whoops, accidentally hit open too soon. Let me fill in the description now.

@JoshRosen JoshRosen changed the title 'SPARK-13308 [SPARK-13308] ManagedBuffers passed to OneToOneStreamManager need to be freed in non-error cases Feb 13, 2016
@JoshRosen
Copy link
Contributor Author

/cc @zsxwing @nongli @rxin

public class OneForOneStreamManagerSuite {

@Test
public void managedBuffersAreFeedWhenConnectionIsClosed() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic that this tests hasn't changed at all, but we didn't have unit tests for this case previously.

@SparkQA
Copy link

SparkQA commented Feb 13, 2016

Test build #51223 has finished for PR 11193 at commit c9726c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -49,11 +49,15 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro

// If the message has a body, take it out to enable zero-copy transfer for the payload.
if (in.body() != null) {
bodyLength = in.body().size();
if (bodyLength > 0) {
in.body().retain();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that this is slightly wrong: we assume that in.body() has a ref count of at least one when it's passed in and ownership of that reference is transferred to this method.

Therefore, we should not add an extra retain here. In order to fix tests, though, I need to add a missing retain to ensure that the message passed in here has a non-zero ref. count for its inner Netty buffer.

@SparkQA
Copy link

SparkQA commented Feb 14, 2016

Test build #51246 has finished for PR 11193 at commit e5cf48d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Feb 14, 2016

Test build #51250 has finished for PR 11193 at commit e5cf48d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -44,7 +44,7 @@ public TestManagedBuffer(int len) {
for (int i = 0; i < len; i ++) {
byteArray[i] = (byte) i;
}
this.underlying = new NettyManagedBuffer(Unpooled.wrappedBuffer(byteArray));
this.underlying = new NettyManagedBuffer(Unpooled.wrappedBuffer(byteArray).retain());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need retain here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the issue here is that other parts of the code were assuming that the buffer passed to NettyManagedBuffer had a non-zero retain count; if we don't add this retain() here then Netty will complain about calling release() without holding any references.

@zsxwing
Copy link
Member

zsxwing commented Feb 15, 2016

After going through other codes, I think the previous codes are correct. However, the doc of convertToNetty is not clear about the ownership of the reference. It transfers the ownership of the internal reference in ManagedBuffer to the caller. In other words, after calling convertToNetty, we don't need to call ManagedBuffer.release.

However, I understand that #10705 requires ManagedBuffer.release should be called when its life ends. So how about changing the semantics of convertToNetty to this:

  • convertToNetty shares the internal content to the caller (e.g., NettyManagedBuffer.convertToNetty will become buf.duplicate().retain())
  • ManagedBuffer.release should be called when its life ends
  • the caller of convertToNetty should release the returned object.

What do you think?

@JoshRosen
Copy link
Contributor Author

Thanks for the careful review, @zsxwing. I agree with your feedback and also think that it makes a lot more sense to have convertToNetty() increment the reference count. I've gone ahead and updated the patch to do this and have rolled back a confusing retain() call in the test code (which you pointed out earlier).

Take a look at the refCnt() assertions that I added in the test suites to see whether they match up with what you had in mind.

Note that as of today convertToNetty is only called in one place in MessageEncoder and the result of this is passed to MessageWithHeader alongside the message that the buffer came from, so it should be verify that MessageWithHeader.deallocate() will free all of the references.

@SparkQA
Copy link

SparkQA commented Feb 16, 2016

Test build #51340 has finished for PR 11193 at commit 2c00f29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ByteBuf result = doWrite(msg, 1);
assertEquals(msg.count(), result.readableBytes());
assertEquals(42, result.readLong());
assertEquals(84, result.readLong());

msg.deallocate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use msg.release instead so as to decrease the reference count of msg.

@zsxwing
Copy link
Member

zsxwing commented Feb 16, 2016

LGTM except nits about msg.deallocate in tests.

@JoshRosen
Copy link
Contributor Author

I've updated the tests using assert(msg.release()), since release's return value lets us check whether deallocate is called.

@zsxwing
Copy link
Member

zsxwing commented Feb 16, 2016

I've updated the tests using assert(msg.release()), since release's return value lets us check whether deallocate is called.

You missed one deallocate.

@JoshRosen
Copy link
Contributor Author

You missed one deallocate.

Whoops, sorry about that. Should be fixed now.

@SparkQA
Copy link

SparkQA commented Feb 16, 2016

Test build #51371 has finished for PR 11193 at commit cb99750.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 16, 2016

LGTM. Merging to master. Thanks a lot, @JoshRosen

@asfgit asfgit closed this in 5f37aad Feb 16, 2016
@JoshRosen JoshRosen deleted the add-missing-release-calls-in-network-layer branch February 16, 2016 20:18
@SparkQA
Copy link

SparkQA commented Feb 16, 2016

Test build #51374 has finished for PR 11193 at commit 014ca9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Feb 26, 2016
## Motivation

As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults.

## Changes

### BlockInfoManager and reader/writer locks

This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes.

`BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748).

See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics.

### Auto-release of locks at the end of tasks

Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task.

To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks.

### Locking and the MemoryStore

In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed.

### Locking and remote block transfer

This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers.

## FAQ

- **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?**

  Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue.

- **Why not detect "leaked" locks in tests?**:

  See above notes about `take()` and `limit`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #10705 from JoshRosen/pin-pages.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants