Skip to content

Commit

Permalink
Implement a realloc() function for DirectByteBuffers to stem memory leak
Browse files Browse the repository at this point in the history
The workingMemory buffer was being allocated anew for every time a compressor
was pulled out of the codec pool, causing a slow leak. This patch uses
undocumented APIs to proactively free memory when new buffers need to be
allocated
  • Loading branch information
toddlipcon committed Jan 19, 2011
1 parent 6cbf4e2 commit 91cf4d9
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 24 deletions.
64 changes: 43 additions & 21 deletions src/java/com/hadoop/compression/lzo/LzoCompressor.java
Expand Up @@ -43,9 +43,9 @@ class LzoCompressor implements Compressor {
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
private Buffer uncompressedDirectBuf = null;
private ByteBuffer uncompressedDirectBuf = null;
private int uncompressedDirectBufLen = 0;
private Buffer compressedDirectBuf = null;
private ByteBuffer compressedDirectBuf = null;
private boolean finish, finished;

private long bytesRead = 0L;
Expand All @@ -56,7 +56,7 @@ class LzoCompressor implements Compressor {
private long lzoCompressor = 0; // The actual lzo compression function.
private int workingMemoryBufLen = 0; // The length of 'working memory' buf.
@SuppressWarnings("unused")
private Buffer workingMemoryBuf; // The 'working memory' for lzo.
private ByteBuffer workingMemoryBuf; // The 'working memory' for lzo.

/**
* The compression algorithm for lzo library.
Expand Down Expand Up @@ -217,35 +217,50 @@ public LzoCompressor(CompressionStrategy strategy, int directBufferSize) {
init(strategy, directBufferSize);
}

/**
* Reallocates a direct byte buffer by freeing the old one and allocating
* a new one, unless the size is the same, in which case it is simply
* cleared and returned.
*
* NOTE: this uses unsafe APIs to manually free memory - if anyone else
* has a reference to the 'buf' parameter they will likely read random
* data or cause a segfault by accessing it.
*/
private ByteBuffer realloc(ByteBuffer buf, int newSize) {
if (buf != null) {
if (buf.capacity() == newSize) {
// Can use existing buffer
buf.clear();
return buf;
}
try {
// Manually free the old buffer using undocumented unsafe APIs.
// If this fails, we'll drop the reference and hope GC finds it
// eventually.
Object cleaner = buf.getClass().getMethod("cleaner").invoke(buf);
cleaner.getClass().getMethod("clean").invoke(cleaner);
} catch (Exception e) {
// Perhaps a non-sun-derived JVM - contributions welcome
LOG.warn("Couldn't realloc bytebuffer", e);
}
}
return ByteBuffer.allocateDirect(newSize);
}

private void init(CompressionStrategy strategy, int directBufferSize) {
this.strategy = strategy;
this.directBufferSize = directBufferSize;

// If the direct buffers are already allocated and the correct size
// don't reallocate - this avoids potential rampant heap growth since GC
// doesn't get triggered by native buffer space usage.
if (uncompressedDirectBuf == null ||
uncompressedDirectBuf.capacity() != directBufferSize) {
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
} else {
uncompressedDirectBuf.clear();
}

if (compressedDirectBuf == null ||
compressedDirectBuf.capacity() != directBufferSize) {
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
} else {
compressedDirectBuf.clear();
}

uncompressedDirectBuf = realloc(uncompressedDirectBuf, directBufferSize);
compressedDirectBuf = realloc(compressedDirectBuf, directBufferSize);
compressedDirectBuf.position(directBufferSize);
reset();

/**
* Initialize {@link #lzoCompress} and {@link #workingMemoryBufLen}
*/
init(this.strategy.getCompressor());
workingMemoryBuf = ByteBuffer.allocateDirect(workingMemoryBufLen);
workingMemoryBuf = realloc(workingMemoryBuf, workingMemoryBufLen);
}

/**
Expand Down Expand Up @@ -401,6 +416,13 @@ public byte[] uncompressedBytes() {
((ByteBuffer)uncompressedDirectBuf).get(b);
return b;
}

/**
* Used only by tests.
*/
long getDirectBufferSize() {
return directBufferSize;
}

/**
* Noop.
Expand Down
6 changes: 3 additions & 3 deletions src/native/impl/lzo/LzoCompressor.c
Expand Up @@ -134,12 +134,12 @@ Java_com_hadoop_compression_lzo_LzoCompressor_initIDs(
LzoCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
LzoCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
"Ljava/nio/ByteBuffer;");
LzoCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, class,
"uncompressedDirectBufLen", "I");
LzoCompressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
"Ljava/nio/ByteBuffer;");
LzoCompressor_directBufferSize = (*env)->GetFieldID(env, class,
"directBufferSize", "I");
LzoCompressor_lzoCompressor = (*env)->GetFieldID(env, class,
Expand All @@ -148,7 +148,7 @@ Java_com_hadoop_compression_lzo_LzoCompressor_initIDs(
"workingMemoryBufLen", "I");
LzoCompressor_workingMemoryBuf = (*env)->GetFieldID(env, class,
"workingMemoryBuf",
"Ljava/nio/Buffer;");
"Ljava/nio/ByteBuffer;");

// record lzo library version
void* lzo_version_ptr = NULL;
Expand Down
26 changes: 26 additions & 0 deletions src/test/com/hadoop/compression/lzo/TestLzoCodec.java
Expand Up @@ -35,6 +35,32 @@ public void testCodecPoolReinit() throws Exception {

}

/**
* Simple test to make sure reinit can switch the buffer size of the
* same pooled codec instance
**/
public void testCodecPoolChangeBufferSize() throws Exception {
Configuration conf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(
LzoCodec.class, conf);

// Put a codec in the pool
Compressor c1 = CodecPool.getCompressor(codec);
assertEquals(LzoCompressor.CompressionStrategy.LZO1X_1,
((LzoCompressor)c1).getStrategy());
CodecPool.returnCompressor(c1);

// Set compression strategy
int newBufSize = LzoCodec.DEFAULT_LZO_BUFFER_SIZE * 2;
LzoCodec.setBufferSize(conf, newBufSize);

Compressor c2 = CodecPool.getCompressor(codec, conf);
assertSame(c1, c2);

assertEquals(newBufSize, ((LzoCompressor)c2).getDirectBufferSize());

}

public void testCodecPoolReuseWithoutConf() throws Exception {
Configuration conf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(
Expand Down

0 comments on commit 91cf4d9

Please sign in to comment.