Permalink
Browse files

Initial compression level support.

  • Loading branch information...
1 parent 6c2cf58 commit a27202db735242533ec9d5608f89246a232d260d Travis Crawford committed May 27, 2011
@@ -46,9 +46,11 @@
public static final String LZO_COMPRESSOR_KEY = "io.compression.codec.lzo.compressor";
public static final String LZO_DECOMPRESSOR_KEY = "io.compression.codec.lzo.decompressor";
+ public static final String LZO_COMPRESSION_LEVEL_KEY = "io.compression.codec.lzo.compression.level";
public static final String LZO_BUFFER_SIZE_KEY = "io.compression.codec.lzo.buffersize";
public static final int DEFAULT_LZO_BUFFER_SIZE = 256 * 1024;
public static final int MAX_BLOCK_SIZE = 64*1024*1024;
+ public static final int UNDEFINED_COMPRESSION_LEVEL = -999; // Constant from LzoCompressor.c
private Configuration conf;
@@ -213,6 +215,10 @@ public String getDefaultExtension() {
LzoDecompressor.CompressionStrategy.LZO1X.name()));
}
+ static int getCompressionLevel(Configuration conf) {
+ return conf.getInt(LZO_COMPRESSION_LEVEL_KEY, UNDEFINED_COMPRESSION_LEVEL);
+ }
+
static int getBufferSize(Configuration conf) {
return conf.getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
}
@@ -227,6 +233,10 @@ public static void setDecompressionStrategy(Configuration conf,
conf.set(LZO_DECOMPRESSOR_KEY, strategy.name());
}
+ public static void setCompressionLevel(Configuration conf, int compressionLevel) {
+ conf.setInt(LZO_COMPRESSION_LEVEL_KEY, compressionLevel);
+ }
+
public static void setBufferSize(Configuration conf, int bufferSize) {
conf.setInt(LZO_BUFFER_SIZE_KEY, bufferSize);
}
@@ -57,6 +57,7 @@
private int workingMemoryBufLen = 0; // The length of 'working memory' buf.
@SuppressWarnings("unused")
private ByteBuffer workingMemoryBuf; // The 'working memory' for lzo.
+ private int lzoCompressionLevel;
/**
* The compression algorithm for lzo library.
@@ -202,9 +203,10 @@ public void reinit(Configuration conf) {
conf = new Configuration();
}
LzoCompressor.CompressionStrategy strategy = LzoCodec.getCompressionStrategy(conf);
+ int compressionLevel = LzoCodec.getCompressionLevel(conf);
int bufferSize = LzoCodec.getBufferSize(conf);
- init(strategy, bufferSize);
+ init(strategy, compressionLevel, bufferSize);
}
/**
@@ -214,7 +216,7 @@ public void reinit(Configuration conf) {
* @param directBufferSize size of the direct buffer to be used.
*/
public LzoCompressor(CompressionStrategy strategy, int directBufferSize) {
- init(strategy, directBufferSize);
+ init(strategy, LzoCodec.DEFAULT_LZO_BUFFER_SIZE, directBufferSize);
}
/**
@@ -247,8 +249,9 @@ private ByteBuffer realloc(ByteBuffer buf, int newSize) {
return ByteBuffer.allocateDirect(newSize);
}
- private void init(CompressionStrategy strategy, int directBufferSize) {
+ private void init(CompressionStrategy strategy, int compressionLevel, int directBufferSize) {
this.strategy = strategy;
+ this.lzoCompressionLevel = compressionLevel;
this.directBufferSize = directBufferSize;
uncompressedDirectBuf = realloc(uncompressedDirectBuf, directBufferSize);
@@ -112,6 +112,7 @@ static jfieldID LzoCompressor_uncompressedDirectBufLen;
static jfieldID LzoCompressor_compressedDirectBuf;
static jfieldID LzoCompressor_directBufferSize;
static jfieldID LzoCompressor_lzoCompressor;
+static jfieldID LzoCompressor_lzoCompressionLevel;
static jfieldID LzoCompressor_workingMemoryBufLen;
static jfieldID LzoCompressor_workingMemoryBuf;
@@ -144,6 +145,8 @@ Java_com_hadoop_compression_lzo_LzoCompressor_initIDs(
"directBufferSize", "I");
LzoCompressor_lzoCompressor = (*env)->GetFieldID(env, class,
"lzoCompressor", "J");
+ LzoCompressor_lzoCompressionLevel = (*env)->GetFieldID(env, class,
+ "lzoCompressionLevel", "I");
LzoCompressor_workingMemoryBufLen = (*env)->GetFieldID(env, class,
"workingMemoryBufLen", "I");
LzoCompressor_workingMemoryBuf = (*env)->GetFieldID(env, class,
@@ -190,6 +193,10 @@ Java_com_hadoop_compression_lzo_LzoCompressor_init(
(*env)->SetIntField(env, this, LzoCompressor_workingMemoryBufLen,
lzo_compressors[compressor].wrkmem);
+ // Save the compresson level into LzoCompressor_lzoCompressor
+ (*env)->SetIntField(env, this, LzoCompressor_lzoCompressionLevel,
+ lzo_compressors[compressor].compression_level);
+
return;
}
@@ -217,6 +224,8 @@ Java_com_hadoop_compression_lzo_LzoCompressor_compressBytesDirect(
LzoCompressor_compressedDirectBuf);
lzo_uint compressed_direct_buf_len = (*env)->GetIntField(env, this,
LzoCompressor_directBufferSize);
+ int compression_level = (*env)->GetIntField(env, this,
+ LzoCompressor_lzoCompressionLevel);
jobject working_memory_buf = (*env)->GetObjectField(env, this,
LzoCompressor_workingMemoryBuf);
@@ -256,7 +265,7 @@ Java_com_hadoop_compression_lzo_LzoCompressor_compressBytesDirect(
// Compress
lzo_uint no_compressed_bytes = compressed_direct_buf_len;
int rv = 0;
- int compression_level = lzo_compressors[compressor].compression_level;
+ compression_level = lzo_compressors[compressor].compression_level;
if (compression_level == UNDEFINED_COMPRESSION_LEVEL) {
lzo_compress_t fptr = (lzo_compress_t) FUNC_PTR(lzo_compressor_funcptr);
rv = fptr(uncompressed_bytes, uncompressed_direct_buf_len,

0 comments on commit a27202d

Please sign in to comment.