Skip to content

Commit

Permalink
More docs and null buffer test
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-thakur committed May 31, 2022
1 parent 35652fd commit 24ac304
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ public interface ByteBufferPool {
*/
void putBuffer(ByteBuffer buffer);

/**
* Clear the buffer pool thus releasing all the buffers.
*/
default void release() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;

/**
Expand All @@ -35,14 +37,33 @@
* smallest buffer whose size is just greater than requested length.
* This is a thread safe implementation.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {

/**
* Map to store direct byte buffers of different sizes in the pool.
* Used tree map such that we can return next greater than capacity
* buffer if buffer with exact capacity is unavailable.
* This must be accessed in synchronized blocks.
*/
private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
new TreeMap<>();

/**
* Map to store heap based byte buffers of different sizes in the pool.
* Used tree map such that we can return next greater than capacity
* buffer if buffer with exact capacity is unavailable.
* This must be accessed in synchronized blocks.
*/
private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
new TreeMap<>();

/**
* Method to get desired buffer tree.
* @param isDirect whether the buffer is heap based or direct.
* @return corresponding buffer tree.
*/
private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
return isDirect
? directBuffers
Expand Down Expand Up @@ -80,8 +101,9 @@ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
return buffer;
}
// buffer was in pool but already got garbage collected.
return direct ? ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
return direct
? ByteBuffer.allocateDirect(length)
: ByteBuffer.allocate(length);
}

/**
Expand Down Expand Up @@ -119,8 +141,15 @@ public synchronized void release() {
directBuffers.clear();
}

/**
* Get current buffers count in the pool.
* @param isDirect whether we want to count the heap or direct buffers.
* @return count of buffers.
*/
@VisibleForTesting
public int getCurrentBuffersCount(boolean isDirect) {
return isDirect ? directBuffers.size() : heapBuffers.size();
public synchronized int getCurrentBuffersCount(boolean isDirect) {
return isDirect
? directBuffers.size()
: heapBuffers.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public void testUnexpectedBufferSizes() throws Exception {
// Expected IllegalArgumentException as negative length buffer is requested.
intercept(IllegalArgumentException.class,
() -> pool.getBuffer(true, -5));

// test returning null buffer to the pool.
intercept(NullPointerException.class,
() -> pool.putBuffer(null));
}

/**
Expand Down

0 comments on commit 24ac304

Please sign in to comment.