Skip to content

Commit

Permalink
[FLINK-20663][core] Release unsafe memory instantly on segment freed.
Browse files Browse the repository at this point in the history
This closes #14904
  • Loading branch information
xintongsong committed Feb 15, 2021
1 parent 0c4d062 commit 1f8be1f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Expand Up @@ -60,6 +60,8 @@ public final class HybridMemorySegment extends MemorySegment {
*/
@Nullable private ByteBuffer offHeapBuffer;

@Nullable private final Runnable cleaner;

/**
* Wrapping is not allowed when the underlying memory is unsafe. Unsafe memory can be actively
* released, without reference counting. Therefore, access from wrapped buffers, which may not
Expand All @@ -80,7 +82,7 @@ public final class HybridMemorySegment extends MemorySegment {
* @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
*/
HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
this(buffer, owner, true);
this(buffer, owner, true, null);
}

/**
Expand All @@ -94,12 +96,18 @@ public final class HybridMemorySegment extends MemorySegment {
* @param buffer The byte buffer whose memory is represented by this memory segment.
* @param owner The owner references by this memory segment.
* @param allowWrap Whether wrapping {@link ByteBuffer}s from the segment is allowed.
* @param cleaner The cleaner to be called on free segment.
* @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
*/
HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, boolean allowWrap) {
HybridMemorySegment(
@Nonnull ByteBuffer buffer,
@Nullable Object owner,
boolean allowWrap,
@Nullable Runnable cleaner) {
super(getByteBufferAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
this.allowWrap = allowWrap;
this.cleaner = cleaner;
}

/**
Expand All @@ -114,6 +122,7 @@ public final class HybridMemorySegment extends MemorySegment {
super(buffer, owner);
this.offHeapBuffer = null;
this.allowWrap = true;
this.cleaner = null;
}

// -------------------------------------------------------------------------
Expand All @@ -123,6 +132,9 @@ public final class HybridMemorySegment extends MemorySegment {
@Override
public void free() {
super.free();
if (cleaner != null) {
cleaner.run();
}
offHeapBuffer = null; // to enable GC of unsafe memory
}

Expand Down
Expand Up @@ -175,8 +175,9 @@ public static MemorySegment allocateOffHeapUnsafeMemory(
int size, Object owner, Runnable customCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, customCleanupAction);
return new HybridMemorySegment(offHeapBuffer, owner, false);
Runnable cleaner =
MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, customCleanupAction);
return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);
}

/**
Expand Down
Expand Up @@ -22,6 +22,10 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.concurrent.CompletableFuture;

import static org.junit.Assert.assertTrue;

/** Tests for the {@link HybridMemorySegment} in off-heap mode using unsafe memory. */
@RunWith(Parameterized.class)
public class HybridOffHeapUnsafeMemorySegmentTest extends MemorySegmentTestBase {
Expand All @@ -45,4 +49,13 @@ MemorySegment createSegment(int size, Object owner) {
public void testByteBufferWrapping() {
createSegment(10).wrap(1, 2);
}

@Test
public void testCallCleanerOnFree() {
final CompletableFuture<Void> cleanerFuture = new CompletableFuture<>();
MemorySegmentFactory.allocateOffHeapUnsafeMemory(
10, null, () -> cleanerFuture.complete(null))
.free();
assertTrue(cleanerFuture.isDone());
}
}

0 comments on commit 1f8be1f

Please sign in to comment.